Skip to content

Commit

Permalink
Updated function to create the filter logic by using the feature of i…
Browse files Browse the repository at this point in the history
…bis to turn table expressions into SQL statements.

This addresses bugs #945 and #950. Unfortunately, we depend on the version of sqlalchemy being 2.0 or later which has fixed
a problem with datetime being rendered by compile - see
https://docs.sqlalchemy.org/en/20/changelog/changelog_20.html#change-206ec1f2af3a0c93785758c723ba356f
  • Loading branch information
sundar-mudupalli-work committed Aug 29, 2023
1 parent 3ac9649 commit a6c82be
Showing 1 changed file with 50 additions and 103 deletions.
153 changes: 50 additions & 103 deletions data_validation/partition_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import ibis
import pandas
import logging
import re
from typing import List, Dict
from argparse import Namespace
import pandas as pd
Expand Down Expand Up @@ -72,76 +73,9 @@ def partition_configs(self) -> None:
yaml_configs_list = self._add_partition_filters(partition_filters)
self._store_partitions(yaml_configs_list)

@staticmethod
def _less_than_value(keys, values) -> str:
"""Takes the list of primary keys in keys and the value of the biggest element
and generates an expression which can be used in a where clause to filter all rows with primary keys equal or
smaller than the biggest element.
Returns:
String containing the expression - e.g. (birth_month < 5 OR (birth_month = 5 AND (birth_day <= 2)))
"""
primary_key = keys[0]
if isinstance(values[0], str):
value0 = "'" + values[0] + "'"
elif isinstance(values[0], pd.Timestamp):
primary_key = "cast(" + keys[0] + " as timestamp)"
value0 = "'" + str(values[0]) + "'"
else:
value0 = str(values[0])

if len(keys) == 1:
return primary_key + " < " + value0
else:
return (
primary_key
+ " < "
+ value0
+ " OR "
+ primary_key
+ " = "
+ value0
+ " AND ("
+ PartitionBuilder._less_than_value(keys[1:], values[1:])
+ ")"
)

@staticmethod
def _geq_value(keys, values) -> str:
"""Takes the list of primary keys in keys and the value of the smallest element
and generates an expression which can be used in a where clause to filter all rows with primary keys equal or
bigger than the smallest element.
Returns:
String containing the expression - e.g. (birth_month > 5 OR (birth_month = 5 AND (birth_day >= 2)))
"""
primary_key = keys[0]
if isinstance(values[0], str):
value0 = "'" + values[0] + "'"
elif isinstance(values[0], pd.Timestamp):
primary_key = "cast(" + keys[0] + " as timestamp)"
value0 = "'" + str(values[0]) + "'"
else:
value0 = str(values[0])

if len(keys) == 1:
return primary_key + " >= " + value0
else:
return (
primary_key
+ " > "
+ value0
+ " OR "
+ primary_key
+ " = "
+ value0
+ " AND ("
+ PartitionBuilder._geq_value(keys[1:], values[1:])
+ ")"
)

def _get_partition_key_filters(self) -> List[List[str]]:
"""Generate where clauses for each partition for each table pair. We are partitioning the tables based on keys, so that
def _get_partition_key_filters(self) -> List[List[List[str]]]:
"""Generate where clauses for each partition for each table pair. We have to create separate where for the source and
target as they may each have a different filter applied to them. We are partitioning the tables based on keys, so that
we get equivalent sized partitions that can be compared against each other. With this approach, we can validate the
partitions in parallel leading to horizontal scaling of DVT. The design doc for this section is available in
docs/internal/partition_table_prd.md
Expand Down Expand Up @@ -169,7 +103,8 @@ def _get_partition_key_filters(self) -> List[List[str]]:
config_manager.target_table,
validation_builder.target_builder,
)

target_table = target_partition_row_builder.query

# Get Source and Target row Count
source_count = source_partition_row_builder.get_count()
target_count = target_partition_row_builder.get_count()
Expand Down Expand Up @@ -239,45 +174,57 @@ def _get_partition_key_filters(self) -> List[List[str]]:
# i.e. greater than or equal to first element and less than first element of next partition
# The first and the last partitions have special where clauses - less than first element of second
# partition and greater than or equal to the first element of the last partition respectively
filter_clause_list = []
filter_clause_list.append(
self._less_than_value(
self.primary_keys, first_elements[1, : len(self.primary_keys)]
)
)
source_where_list = []
target_where_list = []

# Given a ibis filter expression, the following lambda function extracts the where clause in plain text
extract_where = lambda x : re.sub("\s\s+", " ", ibis.to_sql(x).sql.split('WHERE')[1]).replace('t0.','')

# Given a list of primary keys and corresponding values, the following lambda function builds the filter expression
# to find all rows before the row containing the values in the sort order. The next function geq_value, finds all
# rows after the row containing the values in the sort order, including the row specified by values.
less_than_value = lambda table, keys, values : table.__getattr__(keys[0]) < values[0] if len(keys) == 1 \
else (table.__getattr__(keys[0]) < values[0]) | \
((table.__getattr__(keys[0]) == values[0]) & less_than_value(table, keys[1:], values[1:]))
geq_value = lambda table, keys, values : table.__getattr__(keys[0]) >= values[0] if len(keys) == 1 \
else (table.__getattr__(keys[0]) > values[0]) | \
((table.__getattr__(keys[0]) == values[0]) & geq_value(table, keys[1:], values[1:]))

filter_source_clause = less_than_value(source_table, self.primary_keys, first_elements[1, :len(self.primary_keys)])
filter_target_clause = less_than_value(target_table, self.primary_keys, first_elements[1, :len(self.primary_keys)])
source_where_list.append(extract_where(source_table.filter(filter_source_clause)))
target_where_list.append(extract_where(target_table.filter(filter_target_clause)))

for i in range(1, first_elements.shape[0] - 1):
filter_clause_list.append(
"("
+ self._geq_value(
self.primary_keys, first_elements[i, : len(self.primary_keys)]
)
+ ") AND ("
+ self._less_than_value(
self.primary_keys,
first_elements[i + 1, : len(self.primary_keys)],
)
+ ")"
)
filter_clause_list.append(
self._geq_value(
self.primary_keys,
first_elements[len(first_elements) - 1, : len(self.primary_keys)],
)
)

master_filter_list.append(filter_clause_list)
filter_source_clause = \
geq_value(source_table, self.primary_keys, first_elements[i, : len(self.primary_keys)]) & \
less_than_value(source_table, self.primary_keys, first_elements[i + 1, : len(self.primary_keys)])
filter_target_clause = \
geq_value(target_table, self.primary_keys, first_elements[i, : len(self.primary_keys)]) & \
less_than_value(target_table, self.primary_keys, first_elements[i + 1, : len(self.primary_keys)])
source_where_list.append(extract_where(source_table.filter(filter_source_clause)))
target_where_list.append(extract_where(target_table.filter(filter_target_clause)))
filter_source_clause = \
geq_value(source_table, self.primary_keys, first_elements[len(first_elements) - 1, : len(self.primary_keys)])
filter_target_clause = \
geq_value(target_table, self.primary_keys, first_elements[len(first_elements) - 1, : len(self.primary_keys)])
source_where_list.append(extract_where(source_table.filter(filter_source_clause)))
target_where_list.append(extract_where(target_table.filter(filter_target_clause)))
master_filter_list.append([source_where_list, target_where_list])
breakpoint()
return master_filter_list

def _add_partition_filters(
self,
partition_filters: List[List[str]],
partition_filters: List[List[List[str]]],
) -> List[Dict]:
"""Add Partition Filters to ConfigManager and return a list of dict
ConfigManager objects.
Args:
partition_filters (List[List[str]]): List of List of Partition filters
for all Table/ConfigManager objects
partition_filters (List[List[List[str]]]): List of List of Partition filters
for all Table/ConfigManager objects, two list of filters for each configManager object,
one for the source and one for the target
Returns:
yaml_configs_list (List[Dict]): List of YAML configs for all tables
Expand All @@ -293,11 +240,11 @@ def _add_partition_filters(
"target_folder_name": config_manager.full_source_table,
"partitions": [],
}
for pos in range(len(filter_list)):
for pos in range(len(filter_list[0])):
filter_dict = {
"type": "custom",
"source": filter_list[pos],
"target": filter_list[pos],
"source": filter_list[0][pos],
"target": filter_list[1][pos],
}
# Append partition new filter
config_manager.filters.append(filter_dict)
Expand Down

0 comments on commit a6c82be

Please sign in to comment.