From 269f8dc7d8afa78fe8ecc8c79b0eb3d197d1e8f0 Mon Sep 17 00:00:00 2001 From: Prayas Purusottam Date: Mon, 16 Jan 2023 16:01:53 +0530 Subject: [PATCH] feat: Logic to add allow-list to support datatype matching with a provided list in case of mismatched datatypes between source and target (#643) --- README.md | 5 +- data_validation/__main__.py | 3 +- data_validation/cli_tools.py | 5 + data_validation/config_manager.py | 9 ++ data_validation/consts.py | 1 + data_validation/schema_validation.py | 154 ++++++++++++++++++++++++++- tests/unit/test_schema_validation.py | 4 +- 7 files changed, 174 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 2b74a2eb0..fa55fa692 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,10 @@ data-validation (--verbose or -v) (--log-level or -ll) validate schema [--filter-status or -fs STATUSES_LIST] Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned. [--exclusion-columns or -ec EXCLUSION_COLUMNS] - Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b. + Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b. + + [--allow-list or -al ALLOW_LIST] + Comma separated list of data-type mappings of source and destination data sources which will be validated in case of missing data types in destination data source. e.g: "decimal(4,2):decimal(5,4),string[non-nullable]:string" ``` #### Custom Query Column Validations diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 52fbdcc50..202e68577 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -311,7 +311,8 @@ def build_config_managers_from_args(args): config_manager.append_exclusion_columns( [col.casefold() for col in exclusion_columns] ) - + if args.allow_list is not None: + config_manager.append_allow_list(args.allow_list) configs.append(config_manager) return configs diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 80d3260af..807d3bab9 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -490,6 +490,11 @@ def _configure_schema_parser(schema_parser): "-ec", help="Comma separated list of columns 'col_a,col_b' to be excluded from the schema validation", ) + schema_parser.add_argument( + "--allow-list", + "-al", + help="Comma separated list of datatype mappings due to incompatible datatypes in source and target. e.g: decimal(12,2):decimal(38,9),string[non-nullable]:string", + ) def _configure_custom_query_parser(custom_query_parser): diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index 4b4693d1b..a059529a9 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -289,6 +289,11 @@ def exclusion_columns(self): """Return the exclusion columns from Config""" return self._config.get(consts.CONFIG_EXCLUSION_COLUMNS, []) + @property + def allow_list(self): + """Return the allow_list from Config""" + return self._config.get(consts.CONFIG_ALLOW_LIST, "") + @property def filter_status(self): """Return filter status list from Config""" @@ -300,6 +305,10 @@ def append_exclusion_columns(self, column_configs): self.exclusion_columns + column_configs ) + def append_allow_list(self, allow_list): + """Append allow_list of datatype to existing config.""" + self._config[consts.CONFIG_ALLOW_LIST] = allow_list + def get_source_ibis_table(self): """Return IbisTable from source.""" if not hasattr(self, "_source_ibis_table"): diff --git a/data_validation/consts.py b/data_validation/consts.py index da2495a98..1d227cc4d 100644 --- a/data_validation/consts.py +++ b/data_validation/consts.py @@ -60,6 +60,7 @@ CONFIG_FILTER_TARGET_COLUMN = "target_column" CONFIG_FILTER_TARGET_VALUE = "target_value" CONFIG_EXCLUSION_COLUMNS = "exclusion_columns" +CONFIG_ALLOW_LIST = "allow_list" CONFIG_FILTER_STATUS = "filter_status" CONFIG_RESULT_HANDLER = "result_handler" diff --git a/data_validation/schema_validation.py b/data_validation/schema_validation.py index 4cb7583fb..92000fa74 100644 --- a/data_validation/schema_validation.py +++ b/data_validation/schema_validation.py @@ -14,6 +14,7 @@ import datetime import pandas +import logging from data_validation import metadata, consts, clients @@ -50,9 +51,11 @@ def execute(self): target_fields = {} for field_name, data_type in ibis_target_schema.items(): target_fields[field_name] = data_type - results = schema_validation_matching( - source_fields, target_fields, self.config_manager.exclusion_columns + source_fields, + target_fields, + self.config_manager.exclusion_columns, + self.config_manager.allow_list, ) df = pandas.DataFrame( results, @@ -102,7 +105,9 @@ def execute(self): return df -def schema_validation_matching(source_fields, target_fields, exclusion_fields): +def schema_validation_matching( + source_fields, target_fields, exclusion_fields, allow_list +): """Compare schemas between two dictionary objects""" results = [] # Apply the casefold() function to lowercase the keys of source and target @@ -120,6 +125,8 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields): source_fields_casefold.pop(field, None) target_fields_casefold.pop(field, None) + # allow list map in case of incompatible data types in source and target + allow_list_map = parse_allow_list(allow_list) # Go through each source and check if target exists and matches for source_field_name, source_field_type in source_fields_casefold.items(): # target field exists @@ -136,6 +143,45 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields): consts.VALIDATION_STATUS_SUCCESS, ] ) + elif string_val(source_field_type) in allow_list_map: + + allowed_target_field_type = allow_list_map[ + string_val(source_field_type) + ] + + ( + name_mismatch, + higher_precision, + lower_precision, + ) = parse_n_validate_datatypes( + string_val(source_field_type), allowed_target_field_type + ) + if name_mismatch or lower_precision: + results.append( + [ + source_field_name, + source_field_name, + str(source_field_type), + str(target_field_type), + consts.VALIDATION_STATUS_FAIL, + ] + ) + else: + if higher_precision: + logging.warning( + "Source and target data type has precision mismatch: %s - %s", + string_val(source_field_type), + str(target_field_type), + ) + results.append( + [ + source_field_name, + source_field_name, + string_val(source_field_type), + str(target_field_type), + consts.VALIDATION_STATUS_SUCCESS, + ] + ) # target data type mismatch else: results.append( @@ -172,3 +218,105 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields): ] ) return results + + +def parse_allow_list(st): + output = {} + stack = [] + key = None + for i in range(len(st)): + if st[i] == ":": + key = "".join(stack) + output[key] = None + stack = [] + continue + if st[i] == "," and not st[i + 1].isdigit(): + value = "".join(stack) + output[key] = value + stack = [] + i += 1 + continue + stack.append(st[i]) + value = "".join(stack) + output[key] = value + stack = [] + return output + + +def get_datatype_name(st): + chars = [] + for i in range(len(st)): + if ord(st[i].lower()) >= 97 and ord(st[i].lower()) <= 122: + chars.append(st[i].lower()) + out = "".join(chars) + if out == "": + return -1 + return out + + +# typea data types: int8,int16 +def get_typea_numeric_sustr(st): + nums = [] + for i in range(len(st)): + if st[i].isdigit(): + nums.append(st[i]) + num = "".join(nums) + if num == "": + return -1 + return int(num) + + +# typeb data types: Decimal(10,2) +def get_typeb_numeric_sustr(st): + first_half = st.split(",")[0] + second_half = st.split(",")[1] + first_half_num = get_typea_numeric_sustr(first_half) + second_half_num = get_typea_numeric_sustr(second_half) + return first_half_num, second_half_num + + +def string_val(st): + return str(st).replace(" ", "") + + +def validate_typeb_vals(source, target): + if source[0] > target[0] or source[1] > target[1]: + return False, True + elif source[0] == target[0] and source[1] == target[1]: + return False, False + return True, False + + +def strip_null(st): + return st.replace("[non-nullable]", "") + + +def parse_n_validate_datatypes(source, target): + """ + Args: + source: Source table datatype string + target: Target table datatype string + Returns: + bool:source and target datatype names are missmatched or not + bool:target has higher precision value + bool:target has lower precision value + """ + if strip_null(source) == target: + return False, False, False + if get_datatype_name(source) != get_datatype_name(target): + return True, None, None + # Check for type of precisions supplied e.g: int8,Decimal(10,2),int + if "(" in source: + typeb_source = get_typeb_numeric_sustr(source) + typeb_target = get_typeb_numeric_sustr(target) + higher_precision, lower_precision = validate_typeb_vals( + typeb_source, typeb_target + ) + return False, higher_precision, lower_precision + source_num = get_typea_numeric_sustr(source) + target_num = get_typea_numeric_sustr(target) + if source_num == target_num: + return False, False, False + elif source_num > target_num: + return False, False, True + return False, True, False diff --git a/tests/unit/test_schema_validation.py b/tests/unit/test_schema_validation.py index a38553604..2082e8c04 100644 --- a/tests/unit/test_schema_validation.py +++ b/tests/unit/test_schema_validation.py @@ -177,7 +177,7 @@ def test_schema_validation_matching(module_under_test): ], ] assert expected_results == module_under_test.schema_validation_matching( - source_fields, target_fields, [] + source_fields, target_fields, [], "" ) @@ -210,7 +210,7 @@ def test_schema_validation_matching_exclusion_columns(module_under_test): ] assert expected_results == module_under_test.schema_validation_matching( - source_fields, target_fields, ["field2"] + source_fields, target_fields, ["field2"], "" )