Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Logic to add allow-list to support datatype matching with a provided list in case of mismatched datatypes between source and target #643

Merged
merged 7 commits into from
Jan 16, 2023
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ data-validation (--verbose or -v) (--log-level or -ll) validate schema
[--filter-status or -fs STATUSES_LIST]
Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned.
[--exclusion-columns or -ec EXCLUSION_COLUMNS]
Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b.
Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b.

[--allow-list or -al ALLOW_LIST]
Comma separated list of data-type mappings of source and destination data sources which will be validated in case of missing data types in destination data source. e.g: "decimal(4,2):decimal(5,4),string[non-nullable]:string"
```

#### Custom Query Column Validations
Expand Down
3 changes: 2 additions & 1 deletion data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ def build_config_managers_from_args(args):
config_manager.append_exclusion_columns(
[col.casefold() for col in exclusion_columns]
)

if args.allow_list is not None:
config_manager.append_allow_list(args.allow_list)
configs.append(config_manager)

return configs
Expand Down
5 changes: 5 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ def _configure_schema_parser(schema_parser):
"-ec",
help="Comma separated list of columns 'col_a,col_b' to be excluded from the schema validation",
)
schema_parser.add_argument(
"--allow-list",
"-al",
help="Comma separated list of datatype mappings due to incompatible datatypes in source and target. e.g: decimal(12,2):decimal(38,9),string[non-nullable]:string",
)


def _configure_custom_query_parser(custom_query_parser):
Expand Down
9 changes: 9 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ def exclusion_columns(self):
"""Return the exclusion columns from Config"""
return self._config.get(consts.CONFIG_EXCLUSION_COLUMNS, [])

@property
def allow_list(self):
"""Return the allow_list from Config"""
return self._config.get(consts.CONFIG_ALLOW_LIST, "")

@property
def filter_status(self):
"""Return filter status list from Config"""
Expand All @@ -300,6 +305,10 @@ def append_exclusion_columns(self, column_configs):
self.exclusion_columns + column_configs
)

def append_allow_list(self, allow_list):
"""Append allow_list of datatype to existing config."""
self._config[consts.CONFIG_ALLOW_LIST] = allow_list

def get_source_ibis_table(self):
"""Return IbisTable from source."""
if not hasattr(self, "_source_ibis_table"):
Expand Down
1 change: 1 addition & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
CONFIG_FILTER_TARGET_COLUMN = "target_column"
CONFIG_FILTER_TARGET_VALUE = "target_value"
CONFIG_EXCLUSION_COLUMNS = "exclusion_columns"
CONFIG_ALLOW_LIST = "allow_list"
CONFIG_FILTER_STATUS = "filter_status"

CONFIG_RESULT_HANDLER = "result_handler"
Expand Down
154 changes: 151 additions & 3 deletions data_validation/schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datetime
import pandas
import logging

from data_validation import metadata, consts, clients

Expand Down Expand Up @@ -50,9 +51,11 @@ def execute(self):
target_fields = {}
for field_name, data_type in ibis_target_schema.items():
target_fields[field_name] = data_type

results = schema_validation_matching(
source_fields, target_fields, self.config_manager.exclusion_columns
source_fields,
target_fields,
self.config_manager.exclusion_columns,
self.config_manager.allow_list,
)
df = pandas.DataFrame(
results,
Expand Down Expand Up @@ -102,7 +105,9 @@ def execute(self):
return df


def schema_validation_matching(source_fields, target_fields, exclusion_fields):
def schema_validation_matching(
source_fields, target_fields, exclusion_fields, allow_list
):
"""Compare schemas between two dictionary objects"""
results = []
# Apply the casefold() function to lowercase the keys of source and target
Expand All @@ -120,6 +125,8 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
source_fields_casefold.pop(field, None)
target_fields_casefold.pop(field, None)

# allow list map in case of incompatible data types in source and target
allow_list_map = parse_allow_list(allow_list)
# Go through each source and check if target exists and matches
for source_field_name, source_field_type in source_fields_casefold.items():
# target field exists
Expand All @@ -136,6 +143,45 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
consts.VALIDATION_STATUS_SUCCESS,
]
)
elif string_val(source_field_type) in allow_list_map:

allowed_target_field_type = allow_list_map[
string_val(source_field_type)
]

(
name_mismatch,
higher_precision,
lower_precision,
) = parse_n_validate_datatypes(
string_val(source_field_type), allowed_target_field_type
)
if name_mismatch or lower_precision:
results.append(
[
source_field_name,
source_field_name,
str(source_field_type),
str(target_field_type),
consts.VALIDATION_STATUS_FAIL,
]
)
else:
if higher_precision:
logging.warning(
"Source and target data type has precision mismatch: %s - %s",
string_val(source_field_type),
str(target_field_type),
)
results.append(
[
source_field_name,
source_field_name,
string_val(source_field_type),
str(target_field_type),
consts.VALIDATION_STATUS_SUCCESS,
]
)
# target data type mismatch
else:
results.append(
Expand Down Expand Up @@ -172,3 +218,105 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
]
)
return results


def parse_allow_list(st):
output = {}
stack = []
key = None
for i in range(len(st)):
if st[i] == ":":
key = "".join(stack)
output[key] = None
stack = []
continue
if st[i] == "," and not st[i + 1].isdigit():
value = "".join(stack)
output[key] = value
stack = []
i += 1
continue
stack.append(st[i])
value = "".join(stack)
output[key] = value
stack = []
return output


def get_datatype_name(st):
chars = []
for i in range(len(st)):
if ord(st[i].lower()) >= 97 and ord(st[i].lower()) <= 122:
chars.append(st[i].lower())
out = "".join(chars)
if out == "":
return -1
return out


# typea data types: int8,int16
def get_typea_numeric_sustr(st):
nums = []
for i in range(len(st)):
if st[i].isdigit():
nums.append(st[i])
num = "".join(nums)
if num == "":
return -1
return int(num)


# typeb data types: Decimal(10,2)
def get_typeb_numeric_sustr(st):
first_half = st.split(",")[0]
second_half = st.split(",")[1]
first_half_num = get_typea_numeric_sustr(first_half)
second_half_num = get_typea_numeric_sustr(second_half)
return first_half_num, second_half_num


def string_val(st):
return str(st).replace(" ", "")


def validate_typeb_vals(source, target):
if source[0] > target[0] or source[1] > target[1]:
return False, True
elif source[0] == target[0] and source[1] == target[1]:
return False, False
return True, False


def strip_null(st):
return st.replace("[non-nullable]", "")


def parse_n_validate_datatypes(source, target):
"""
Args:
source: Source table datatype string
target: Target table datatype string
Returns:
bool:source and target datatype names are missmatched or not
bool:target has higher precision value
bool:target has lower precision value
"""
if strip_null(source) == target:
return False, False, False
if get_datatype_name(source) != get_datatype_name(target):
return True, None, None
# Check for type of precisions supplied e.g: int8,Decimal(10,2),int
if "(" in source:
typeb_source = get_typeb_numeric_sustr(source)
typeb_target = get_typeb_numeric_sustr(target)
higher_precision, lower_precision = validate_typeb_vals(
typeb_source, typeb_target
)
return False, higher_precision, lower_precision
source_num = get_typea_numeric_sustr(source)
target_num = get_typea_numeric_sustr(target)
if source_num == target_num:
return False, False, False
elif source_num > target_num:
return False, False, True
return False, True, False
4 changes: 2 additions & 2 deletions tests/unit/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def test_schema_validation_matching(module_under_test):
],
]
assert expected_results == module_under_test.schema_validation_matching(
source_fields, target_fields, []
source_fields, target_fields, [], ""
)


Expand Down Expand Up @@ -210,7 +210,7 @@ def test_schema_validation_matching_exclusion_columns(module_under_test):
]

assert expected_results == module_under_test.schema_validation_matching(
source_fields, target_fields, ["field2"]
source_fields, target_fields, ["field2"], ""
)


Expand Down