Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Issue619 determine the method for generating batches efficiently #653

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7d99ebf
init
mohdt786 Dec 15, 2022
80c0f8b
init
mohdt786 Dec 15, 2022
032f883
init
mohdt786 Dec 16, 2022
bf95ea4
init
mohdt786 Dec 16, 2022
2703427
Added Function: build_and_store_config_files
mohdt786 Dec 18, 2022
1dedce3
feat: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
7863f67
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Dec 21, 2022
09e2132
fix: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
524db2d
testing and linting
mohdt786 Dec 21, 2022
259c548
fix: linting and test
mohdt786 Dec 21, 2022
96cf91d
fix: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
b6255bc
feat: Added Partition support to generate multiple yaml config files
mohdt786 Dec 21, 2022
4bf5f4f
fix: removed type hint for method - get_pandas_df
mohdt786 Dec 21, 2022
58a7a00
linting and fixes
mohdt786 Dec 22, 2022
07333d8
type hints and lint
mohdt786 Dec 22, 2022
32fd216
fix: Raise value error for:
mohdt786 Dec 22, 2022
b5145b8
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Dec 28, 2022
da69e8b
fix: Updated arguments and argument parser
mohdt786 Dec 28, 2022
99bb79c
feat: Added Partition support to generate multiple yaml config files
mohdt786 Jan 4, 2023
1e28a4a
Linting and TypeHint added
mohdt786 Jan 4, 2023
2b2e76a
fix: Added Warning logs and Documentation
mohdt786 Jan 4, 2023
1af3f35
fix: removed unwanted Docs
mohdt786 Jan 4, 2023
45e51b9
fix: Added --tables-list to required for generate-partitions row command
mohdt786 Jan 6, 2023
57549b3
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 6, 2023
c34bd8a
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 11, 2023
d208a18
refactor: Removed ROW command and added desc to README.md
mohdt786 Jan 16, 2023
06bb8e3
feat: Added support to save partition configs to GCS bucket
mohdt786 Jan 18, 2023
654061f
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 18, 2023
e031ca5
fix: fixed path conflicts
mohdt786 Jan 18, 2023
fda93a4
fix: re-wrote logs which are used in tests/unit/test_cli_tools.py::te…
mohdt786 Jan 18, 2023
ffba13e
test: Added test cases for partition_builder.py
mohdt786 Jan 19, 2023
20d9556
fix: extra line adjustment in consts.py
mohdt786 Jan 19, 2023
040d801
fix: updated source/target partition_key datatype check to support py…
mohdt786 Jan 19, 2023
4c2e330
Added test cases for storing partitions in local dir
mohdt786 Jan 20, 2023
56e4354
Merge branch 'develop' into issue619-determine-the-method-for-generat…
mohdt786 Jan 20, 2023
35d99bc
lint: tests/unit/test_partition_builder.py
mohdt786 Jan 20, 2023
936421f
Added test inputs to tests/unit/test_inputs/test_partition_builder.json
mohdt786 Jan 24, 2023
bb82253
lint: tests/unit.test_partition_builder.py
mohdt786 Jan 24, 2023
4fc5a55
fix: Reduced test input size
mohdt786 Jan 24, 2023
e531d8e
fix: Added logging before writing partitions
mohdt786 Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,14 @@ terraform.rc

# Custom
*.yaml
partitions_dir

# Test temp files
source_table_data.json
target_table_data.json
data_validation/cli_tools.py
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
source-sql.sql

# Commit msg
commits.txt
test.py
315 changes: 310 additions & 5 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import sys
import logging
from yaml import Dumper, dump

from typing import List, Tuple
import pandas
from argparse import Namespace
from data_validation import (
cli_tools,
clients,
Expand Down Expand Up @@ -49,13 +51,40 @@ def _get_arg_config_file(args):
return args.config_file


def _get_arg_config_dir(args) -> str:
"""Return String yaml config folder path."""
if not args.config_dir:
raise ValueError("YAML Config Dir Path was not supplied.")

return args.config_dir


def _get_arg_partition_type(args: Namespace) -> str:
"""Return the type of Partition Logic to be used from args

Args:
args (Namespace): User specified Arguments.

Returns:
Type of Partition logic to be used to split Config-Files
"""
if not args.partition_type: # Use default Partition logic if not supplied
return consts.DEFAULT_PARTITION_TYPE
elif args.partition_type not in consts.PARTITION_TYPES:
# Already handled in argparser argument choices,
# check kept for explicit calls
raise ValueError(f"Unknown Partition Type: {args.partition_type}")

return args.partition_type


def _get_yaml_config_from_file(config_file_path):
"""Return Dict of yaml validation data."""
yaml_config = cli_tools.get_validation(config_file_path)
return yaml_config


def get_aggregate_config(args, config_manager):
def get_aggregate_config(args, config_manager: ConfigManager):
"""Return list of formated aggregation objects.

Args:
Expand Down Expand Up @@ -472,10 +501,284 @@ def store_yaml_config_file(args, config_managers):
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
""" """
config_managers = build_config_managers_from_args(args)
def partition_and_store_config_files(args: Namespace) -> None:
"""Build multiple YAML Config files using user specified partition logic

Args:
args (Namespace): User specified Arguments
config_managers (List[ConfigManager]): List of config manager instances.

Returns:
None
"""
if args.validate_cmd == "row":
config_managers = build_config_managers_from_args(args)
partition_configs(args, config_managers)
else:
raise ValueError(f"Validation Argument '{args.validate_cmd}' is not supported")


def partition_configs(args: Namespace, config_managers: List[ConfigManager]):
"""Takes a list of ConfigManager object and splits each it into multiple
ConfigManager objects applying supplied partition logic.

Args:
args (Namespace): User specified Arguments.
config_managers (List[ConfigManager]): List of config manager instances.

Returns:
None
"""

config_dir = _get_arg_config_dir(args)
partition_type = _get_arg_partition_type(args)
if partition_type == "primary_key":
partition_filters = _get_primary_key_partition_filters(args)
elif partition_type == "primary_key_mod":
# TODO: Add support for Primary_key + Mod
raise ValueError(f"Partition Type: '{partition_type}' is not supported")
elif partition_type == "hash_mod":
# TODO: Add support for Hash + Mod
raise ValueError(f"Partition Type: '{partition_type}' is not supported")

_add_partition_filters_and_store(
config_managers, partition_filters, config_dir, args
)


def _get_primary_key_partition_filters(args: Namespace) -> List[List[str]]:
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
"""Generate Partition filters for primary_key type partition for all
Configs/Tables.

Args:
args (Namespace): User specified Arguments.

Returns:
A list of lists of partition filters for each table
"""

# Build config managers for count, min and max on primary_key
agg_config_managers = build_primary_key_agg_config_managers_from_args(args)

master_filter_list = []
primary_keys = cli_tools.get_arg_list(args.primary_keys)
for ind in range(len(agg_config_managers)):

# Retrieve source and target agg dataframes
source_df, target_df = get_dataframe(agg_config_managers[ind])

source_datatype = source_df[source_df.columns[-1]].dtype.name
target_datatype = target_df[target_df.columns[-1]].dtype.name
accepted_datatypes = ("int64", "int32")
if (
source_datatype not in accepted_datatypes
and target_datatype not in accepted_datatypes
):
raise ValueError(
"Expected primary_key for partition to be of type int."
f"Got {source_df[source_df.columns[-1]].dtype.name}"
)

source_count = source_df.iloc[0]["count"]
target_count = target_df.iloc[0]["count"]
row_count = max(source_count, target_count)

# If supplied partition_num is greater than count(*) coalesce it
if args.partition_num > row_count:
partition_count = row_count
else:
partition_count = args.partition_num

primary_key = primary_keys[0]
min_column = f"min__{primary_key}"
max_column = f"max__{primary_key}"

source_min = source_df.iloc[0][min_column]
target_min = target_df.iloc[0][min_column]

lower_bound = min(source_min, target_min)

source_max = source_df.iloc[0][max_column]
target_max = target_df.iloc[0][max_column]

upper_bound = max(source_max, target_max)

filter_list = [] # Store partition filters per config/table
i = 0
marker = lower_bound
partition_step = (upper_bound - lower_bound) // partition_count
while i < partition_count:
lower_val = marker
upper_val = marker + partition_step

if i == partition_count - 1:
upper_val = upper_bound + 1

partition_filter = (
f"{primary_key} >= {lower_val} and {primary_key} < {upper_val}"
)
filter_list.append(partition_filter)

i += 1
marker += partition_step

master_filter_list.append(filter_list)

return master_filter_list


def get_dataframe(
config_manager: ConfigManager,
) -> Tuple[pandas.DataFrame, pandas.DataFrame]:
"""Build source and target pandas dataframes from input ConfigManager object.

Args:
config_manager (ConfigManager): config manager instance.

Returns:
A tuple of source and target pandas dataframe
"""
agg_validator = DataValidation(
config_manager.config, validation_builder=None, result_handler=None
)
source_df, target_df = agg_validator.get_pandas_df()
return (source_df, target_df)


def build_primary_key_agg_config_managers_from_args(
args: Namespace,
) -> List[ConfigManager]:
"""Build a list of ConfigManager object for finding count, min and max of primary_key.

This method is used for building ConfigManager objects for all the input
tables to get count, min and max of primary_key column. Used for finding
filters for primary_key type Partition.

Args:
args(Namespace): User specified Arguments.

Returns:
A list of type ConfigManager to get count, min and max of primary_key column.
"""
agg_config_managers = []
config_type = consts.COLUMN_VALIDATION
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
result_handler_config = None
primary_keys = cli_tools.get_arg_list(args.primary_keys)

filter_config, labels, threshold = [], [], 0.0
if args.filters:
filter_config = cli_tools.get_filters(args.filters)

labels = []
mgr = state_manager.StateManager()
source_client = clients.get_data_client(mgr.get_connection_config(args.source_conn))
target_client = clients.get_data_client(mgr.get_connection_config(args.target_conn))

format = "table"
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
use_random_rows = None
random_row_batch_size = None
is_filesystem = source_client._source_type == "FileSystem"

tables_list = cli_tools.get_tables_list(
args.tables_list, default_value=[{}], is_filesystem=is_filesystem
)

filter_status = None

for ind in range(len(tables_list)):
config_manager = ConfigManager.build_config_manager(
config_type,
args.source_conn,
args.target_conn,
tables_list[ind],
labels,
threshold,
format,
use_random_rows=use_random_rows,
random_row_batch_size=random_row_batch_size,
source_client=source_client,
target_client=target_client,
result_handler_config=result_handler_config,
filter_config=filter_config,
filter_status=filter_status,
verbose=args.verbose,
)

primary_key = primary_keys[0]
aggregate_configs = [config_manager.build_config_count_aggregate()]
aggregate_configs += config_manager.build_config_column_aggregates(
mohdt786 marked this conversation as resolved.
Show resolved Hide resolved
"count", [primary_key], None, None
)
aggregate_configs += config_manager.build_config_column_aggregates(
"min", [primary_key], None, None
)
aggregate_configs += config_manager.build_config_column_aggregates(
"max", [primary_key], None, None
)
config_manager.append_aggregates(aggregate_configs)

agg_config_managers.append(config_manager)
return agg_config_managers


def _add_partition_filters_and_store(
config_managers: List[ConfigManager],
partition_filters: List[List[str]],
config_dir: str,
args: Namespace,
):
"""Add Partition Filters to ConfigManager and return a list of ConfigManager objects.

Args:
config_manager (List[ConfigManager]): List of Config manager instances.
partition_filters (List[List[str]]): List of List of Partition filters
for all Table/ConfigManager objects

Returns:
None
"""

table_count = len(config_managers)
for ind in range(table_count):
config_manager = config_managers[ind]
filter_list = partition_filters[ind]

target_folder_name = config_manager.full_source_table
target_folder_path = cli_tools.get_target_table_folder_path(
config_dir, target_folder_name
)

for pos in range(len(filter_list)):
filter_dict = {
"type": "custom",
"source": filter_list[pos],
"target": filter_list[pos],
}
# Append partition new filter
config_manager.filters.append(filter_dict)

# Save partition
yaml_configs = convert_config_to_yaml(args, config_managers)
target_file_name = "0" * (4 - len(str(pos))) + str(pos) + ".yaml"
config_file_path = os.path.join(target_folder_path, target_file_name)
cli_tools.store_validation(config_file_path, yaml_configs)

# Pop last partition filter
config_manager.filters.pop()


def run(args) -> None:
"""Splits execution into:
1. Build and save single Yaml Config file
2. Run Validations

Args:
args (Namespace): User specified Arguments.

Returns:
None
"""
config_managers = build_config_managers_from_args(args)
if args.config_file:
store_yaml_config_file(args, config_managers)
else:
Expand Down Expand Up @@ -548,6 +851,8 @@ def main():
from data_validation import app

app.app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))
elif args.command == "get-partitions":
partition_and_store_config_files(args)
else:
raise ValueError(f"Positional Argument '{args.command}' is not supported")

Expand Down
Loading