Skip to content

Commit

Permalink
feat: Logic to add allow-list to support datatype matching with a pro…
Browse files Browse the repository at this point in the history
…vided list in case of mismatched datatypes between source and target (#643)
  • Loading branch information
kanhaPrayas committed Jan 16, 2023
1 parent 196323e commit 269f8dc
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 7 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ data-validation (--verbose or -v) (--log-level or -ll) validate schema
[--filter-status or -fs STATUSES_LIST]
Comma separated list of statuses to filter the validation results. Supported statuses are (success, fail). If no list is provided, all statuses are returned.
[--exclusion-columns or -ec EXCLUSION_COLUMNS]
Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b.
Comma separated list of columns to be excluded from the schema validation, i.e col_a,col_b.
[--allow-list or -al ALLOW_LIST]
Comma separated list of data-type mappings of source and destination data sources which will be validated in case of missing data types in destination data source. e.g: "decimal(4,2):decimal(5,4),string[non-nullable]:string"
```

#### Custom Query Column Validations
Expand Down
3 changes: 2 additions & 1 deletion data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ def build_config_managers_from_args(args):
config_manager.append_exclusion_columns(
[col.casefold() for col in exclusion_columns]
)

if args.allow_list is not None:
config_manager.append_allow_list(args.allow_list)
configs.append(config_manager)

return configs
Expand Down
5 changes: 5 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,11 @@ def _configure_schema_parser(schema_parser):
"-ec",
help="Comma separated list of columns 'col_a,col_b' to be excluded from the schema validation",
)
schema_parser.add_argument(
"--allow-list",
"-al",
help="Comma separated list of datatype mappings due to incompatible datatypes in source and target. e.g: decimal(12,2):decimal(38,9),string[non-nullable]:string",
)


def _configure_custom_query_parser(custom_query_parser):
Expand Down
9 changes: 9 additions & 0 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ def exclusion_columns(self):
"""Return the exclusion columns from Config"""
return self._config.get(consts.CONFIG_EXCLUSION_COLUMNS, [])

@property
def allow_list(self):
"""Return the allow_list from Config"""
return self._config.get(consts.CONFIG_ALLOW_LIST, "")

@property
def filter_status(self):
"""Return filter status list from Config"""
Expand All @@ -300,6 +305,10 @@ def append_exclusion_columns(self, column_configs):
self.exclusion_columns + column_configs
)

def append_allow_list(self, allow_list):
"""Append allow_list of datatype to existing config."""
self._config[consts.CONFIG_ALLOW_LIST] = allow_list

def get_source_ibis_table(self):
"""Return IbisTable from source."""
if not hasattr(self, "_source_ibis_table"):
Expand Down
1 change: 1 addition & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
CONFIG_FILTER_TARGET_COLUMN = "target_column"
CONFIG_FILTER_TARGET_VALUE = "target_value"
CONFIG_EXCLUSION_COLUMNS = "exclusion_columns"
CONFIG_ALLOW_LIST = "allow_list"
CONFIG_FILTER_STATUS = "filter_status"

CONFIG_RESULT_HANDLER = "result_handler"
Expand Down
154 changes: 151 additions & 3 deletions data_validation/schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import datetime
import pandas
import logging

from data_validation import metadata, consts, clients

Expand Down Expand Up @@ -50,9 +51,11 @@ def execute(self):
target_fields = {}
for field_name, data_type in ibis_target_schema.items():
target_fields[field_name] = data_type

results = schema_validation_matching(
source_fields, target_fields, self.config_manager.exclusion_columns
source_fields,
target_fields,
self.config_manager.exclusion_columns,
self.config_manager.allow_list,
)
df = pandas.DataFrame(
results,
Expand Down Expand Up @@ -102,7 +105,9 @@ def execute(self):
return df


def schema_validation_matching(source_fields, target_fields, exclusion_fields):
def schema_validation_matching(
source_fields, target_fields, exclusion_fields, allow_list
):
"""Compare schemas between two dictionary objects"""
results = []
# Apply the casefold() function to lowercase the keys of source and target
Expand All @@ -120,6 +125,8 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
source_fields_casefold.pop(field, None)
target_fields_casefold.pop(field, None)

# allow list map in case of incompatible data types in source and target
allow_list_map = parse_allow_list(allow_list)
# Go through each source and check if target exists and matches
for source_field_name, source_field_type in source_fields_casefold.items():
# target field exists
Expand All @@ -136,6 +143,45 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
consts.VALIDATION_STATUS_SUCCESS,
]
)
elif string_val(source_field_type) in allow_list_map:

allowed_target_field_type = allow_list_map[
string_val(source_field_type)
]

(
name_mismatch,
higher_precision,
lower_precision,
) = parse_n_validate_datatypes(
string_val(source_field_type), allowed_target_field_type
)
if name_mismatch or lower_precision:
results.append(
[
source_field_name,
source_field_name,
str(source_field_type),
str(target_field_type),
consts.VALIDATION_STATUS_FAIL,
]
)
else:
if higher_precision:
logging.warning(
"Source and target data type has precision mismatch: %s - %s",
string_val(source_field_type),
str(target_field_type),
)
results.append(
[
source_field_name,
source_field_name,
string_val(source_field_type),
str(target_field_type),
consts.VALIDATION_STATUS_SUCCESS,
]
)
# target data type mismatch
else:
results.append(
Expand Down Expand Up @@ -172,3 +218,105 @@ def schema_validation_matching(source_fields, target_fields, exclusion_fields):
]
)
return results


def parse_allow_list(st):
output = {}
stack = []
key = None
for i in range(len(st)):
if st[i] == ":":
key = "".join(stack)
output[key] = None
stack = []
continue
if st[i] == "," and not st[i + 1].isdigit():
value = "".join(stack)
output[key] = value
stack = []
i += 1
continue
stack.append(st[i])
value = "".join(stack)
output[key] = value
stack = []
return output


def get_datatype_name(st):
chars = []
for i in range(len(st)):
if ord(st[i].lower()) >= 97 and ord(st[i].lower()) <= 122:
chars.append(st[i].lower())
out = "".join(chars)
if out == "":
return -1
return out


# typea data types: int8,int16
def get_typea_numeric_sustr(st):
nums = []
for i in range(len(st)):
if st[i].isdigit():
nums.append(st[i])
num = "".join(nums)
if num == "":
return -1
return int(num)


# typeb data types: Decimal(10,2)
def get_typeb_numeric_sustr(st):
first_half = st.split(",")[0]
second_half = st.split(",")[1]
first_half_num = get_typea_numeric_sustr(first_half)
second_half_num = get_typea_numeric_sustr(second_half)
return first_half_num, second_half_num


def string_val(st):
return str(st).replace(" ", "")


def validate_typeb_vals(source, target):
if source[0] > target[0] or source[1] > target[1]:
return False, True
elif source[0] == target[0] and source[1] == target[1]:
return False, False
return True, False


def strip_null(st):
return st.replace("[non-nullable]", "")


def parse_n_validate_datatypes(source, target):
"""
Args:
source: Source table datatype string
target: Target table datatype string
Returns:
bool:source and target datatype names are missmatched or not
bool:target has higher precision value
bool:target has lower precision value
"""
if strip_null(source) == target:
return False, False, False
if get_datatype_name(source) != get_datatype_name(target):
return True, None, None
# Check for type of precisions supplied e.g: int8,Decimal(10,2),int
if "(" in source:
typeb_source = get_typeb_numeric_sustr(source)
typeb_target = get_typeb_numeric_sustr(target)
higher_precision, lower_precision = validate_typeb_vals(
typeb_source, typeb_target
)
return False, higher_precision, lower_precision
source_num = get_typea_numeric_sustr(source)
target_num = get_typea_numeric_sustr(target)
if source_num == target_num:
return False, False, False
elif source_num > target_num:
return False, False, True
return False, True, False
4 changes: 2 additions & 2 deletions tests/unit/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def test_schema_validation_matching(module_under_test):
],
]
assert expected_results == module_under_test.schema_validation_matching(
source_fields, target_fields, []
source_fields, target_fields, [], ""
)


Expand Down Expand Up @@ -210,7 +210,7 @@ def test_schema_validation_matching_exclusion_columns(module_under_test):
]

assert expected_results == module_under_test.schema_validation_matching(
source_fields, target_fields, ["field2"]
source_fields, target_fields, ["field2"], ""
)


Expand Down

0 comments on commit 269f8dc

Please sign in to comment.