Skip to content

Commit

Permalink
fix: test for nan when calculating fail/success in combiner (GoogleCl…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajwelch4 committed Feb 15, 2022
1 parent 100b3ea commit 6252bec
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 1 deletion.
7 changes: 6 additions & 1 deletion data_validation/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ def _calculate_difference(field_differences, datatype, validation, is_value_comp
).cast("float64")

th_diff = (pct_difference.abs() - pct_threshold).cast("float64")
status = ibis.case().when(th_diff > 0.0, "fail").else_("success").end()
status = (
ibis.case()
.when(th_diff.isnan() | (th_diff > 0.0), "fail")
.else_("success")
.end()
)

return (
difference.name("difference"),
Expand Down
230 changes: 230 additions & 0 deletions tests/unit/test_combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,3 +580,233 @@ def test_generate_report_with_group_by(
.reindex(sorted(expected.columns), axis=1)
)
pandas.testing.assert_frame_equal(report, expected)


@pytest.mark.parametrize(
("source_df", "target_df", "run_metadata", "expected"),
(
(
pandas.DataFrame({"sum": [8093]}),
pandas.DataFrame({"sum": [_NAN]}),
metadata.RunMetadata(
validations={
"sum": metadata.ValidationMetadata(
source_table_name="test_source",
source_table_schema="bq-public.source_dataset",
source_column_name="test_col",
target_table_name="test_target",
target_table_schema="bq-public.target_dataset",
target_column_name="test_col",
validation_type="Column",
aggregation_type="sum",
threshold=0.0,
),
},
start_time=datetime.datetime(1998, 9, 4, 7, 30, 1),
end_time=None,
labels=[("name", "test_label")],
run_id="test-run",
),
pandas.DataFrame(
{
"run_id": ["test-run"],
"start_time": [datetime.datetime(1998, 9, 4, 7, 30, 1)],
"end_time": [datetime.datetime(1998, 9, 4, 7, 31, 42)],
"source_table_name": ["bq-public.source_dataset.test_source"],
"source_column_name": ["test_col"],
"target_table_name": ["bq-public.target_dataset.test_target"],
"target_column_name": ["test_col"],
"validation_type": ["Column"],
"aggregation_type": ["sum"],
"validation_name": ["sum"],
"source_agg_value": ["8093"],
"target_agg_value": ["nan"],
"group_by_columns": [None],
"difference": [_NAN],
"pct_difference": [_NAN],
"pct_threshold": [0.0],
"status": ["fail"],
"labels": [[("name", "test_label")]],
}
),
),
(
pandas.DataFrame({"sum": [_NAN]}),
pandas.DataFrame({"sum": [8093]}),
metadata.RunMetadata(
validations={
"sum": metadata.ValidationMetadata(
source_table_name="test_source",
source_table_schema="bq-public.source_dataset",
source_column_name="test_col",
target_table_name="test_target",
target_table_schema="bq-public.target_dataset",
target_column_name="test_col",
validation_type="Column",
aggregation_type="sum",
threshold=0.0,
),
},
start_time=datetime.datetime(1998, 9, 4, 7, 30, 1),
end_time=None,
labels=[("name", "test_label")],
run_id="test-run",
),
pandas.DataFrame(
{
"run_id": ["test-run"],
"start_time": [datetime.datetime(1998, 9, 4, 7, 30, 1)],
"end_time": [datetime.datetime(1998, 9, 4, 7, 31, 42)],
"source_table_name": ["bq-public.source_dataset.test_source"],
"source_column_name": ["test_col"],
"target_table_name": ["bq-public.target_dataset.test_target"],
"target_column_name": ["test_col"],
"validation_type": ["Column"],
"aggregation_type": ["sum"],
"validation_name": ["sum"],
"source_agg_value": ["nan"],
"target_agg_value": ["8093"],
"group_by_columns": [None],
"difference": [_NAN],
"pct_difference": [_NAN],
"pct_threshold": [0.0],
"status": ["fail"],
"labels": [[("name", "test_label")]],
}
),
),
(
pandas.DataFrame({"count": [1]}),
pandas.DataFrame({"count": [_NAN]}),
metadata.RunMetadata(
validations={
"count": metadata.ValidationMetadata(
source_table_name="test_source",
source_table_schema="bq-public.source_dataset",
source_column_name=None,
target_table_name="test_target",
target_table_schema="bq-public.target_dataset",
target_column_name=None,
validation_type="Column",
aggregation_type="count",
threshold=0.0,
),
},
start_time=datetime.datetime(1998, 9, 4, 7, 30, 1),
end_time=None,
labels=[("name", "test_label")],
run_id="test-run",
),
pandas.DataFrame(
{
"run_id": ["test-run"],
"start_time": [datetime.datetime(1998, 9, 4, 7, 30, 1)],
"end_time": [datetime.datetime(1998, 9, 4, 7, 31, 42)],
"source_table_name": ["bq-public.source_dataset.test_source"],
"source_column_name": [None],
"target_table_name": ["bq-public.target_dataset.test_target"],
"target_column_name": [None],
"validation_type": ["Column"],
"aggregation_type": ["count"],
"validation_name": ["count"],
"source_agg_value": ["1"],
"target_agg_value": ["nan"],
"group_by_columns": [None],
"difference": [_NAN],
"pct_difference": [_NAN],
"pct_threshold": [0.0],
"status": ["fail"],
"labels": [[("name", "test_label")]],
}
),
),
(
pandas.DataFrame({"count": [8], "sum__ttteeesssttt": [-1]}),
pandas.DataFrame({"count": [9], "sum__ttteeesssttt": [_NAN]}),
metadata.RunMetadata(
validations={
"count": metadata.ValidationMetadata(
source_table_name="test_source",
source_table_schema="bq-public.source_dataset",
source_column_name=None,
target_table_name="test_target",
target_table_schema="bq-public.target_dataset",
target_column_name=None,
validation_type="Column",
aggregation_type="count",
threshold=30.0,
),
"sum__ttteeesssttt": metadata.ValidationMetadata(
source_table_name="test_source",
source_table_schema="bq-public.source_dataset",
source_column_name="test_col",
target_table_name="test_target",
target_table_schema="bq-public.target_dataset",
target_column_name="ttteeesssttt_col",
validation_type="Column",
aggregation_type="sum",
threshold=0.0,
),
},
start_time=datetime.datetime(1998, 9, 4, 7, 30, 1),
end_time=None,
labels=[("name", "test_label")],
run_id="test-run",
),
pandas.DataFrame(
{
"run_id": ["test-run"] * 2,
"start_time": [datetime.datetime(1998, 9, 4, 7, 30, 1)] * 2,
"end_time": [datetime.datetime(1998, 9, 4, 7, 31, 42)] * 2,
"source_table_name": [
"bq-public.source_dataset.test_source",
"bq-public.source_dataset.test_source",
],
"source_column_name": [None, "test_col"],
"target_table_name": [
"bq-public.target_dataset.test_target",
"bq-public.target_dataset.test_target",
],
"target_column_name": [None, "ttteeesssttt_col"],
"validation_type": ["Column", "Column"],
"aggregation_type": ["count", "sum"],
"validation_name": ["count", "sum__ttteeesssttt"],
"source_agg_value": ["8", "-1"],
"target_agg_value": ["9", "nan"],
"group_by_columns": [None, None],
"difference": [1.0, _NAN],
"pct_difference": [12.5, _NAN],
"pct_threshold": [30.0, 0.0],
"status": ["success", "fail"],
"labels": [[("name", "test_label")]] * 2,
}
),
),
),
)
def test_generate_report_with_nan_agg_value(
module_under_test, patch_datetime_now, source_df, target_df, run_metadata, expected
):
pandas_client = ibis.backends.pandas.connect(
{"test_source": source_df, "test_target": target_df}
)
report = module_under_test.generate_report(
pandas_client,
run_metadata,
source=pandas_client.table("test_source"),
target=pandas_client.table("test_target"),
)
# Sort columns by name to order in the comparison.
# https://stackoverflow.com/a/11067072/101923
# Sort rows by name to order in the comparison.
report = (
report.sort_values("validation_name")
.reset_index(drop=True)
.reindex(sorted(report.columns), axis=1)
)
expected = (
expected.sort_values("validation_name")
.reset_index(drop=True)
.reindex(sorted(expected.columns), axis=1)
)
pandas.testing.assert_frame_equal(report, expected)

0 comments on commit 6252bec

Please sign in to comment.