Skip to content

Commit

Permalink
Back out changes made for Kubernetes and by earlier attempts to fix #945
Browse files Browse the repository at this point in the history
 and #950
  • Loading branch information
sundar-mudupalli-work committed Aug 30, 2023
1 parent 58d7951 commit 0d8296c
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 169 deletions.
24 changes: 6 additions & 18 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import logging
import os
import sys
import copy

from yaml import Dumper, dump
from argparse import Namespace
Expand Down Expand Up @@ -287,23 +286,12 @@ def build_config_managers_from_args(

def config_runner(args):
if args.config_dir:
if args.kube_completions and (('JOB_COMPLETION_INDEX' in os.environ.keys()) or ('CLOUD_RUN_TASK_INDEX' in os.environ.keys())) :
# Running in Kubernetes in Job completions - only run the yaml file corresponding to index
job_index = int(os.environ.get('JOB_COMPLETION_INDEX')) if 'JOB_COMPLETION_INDEX' in os.environ.keys() \
else int(os.environ.get('CLOUD_RUN_TASK_INDEX'))
config_file_path = args.config_dir + '{:04d}'.format(job_index) + '.yaml' if args.config_dir.endswith('/') \
else args.config_dir + '/' + '{:04d}'.format(job_index) + '.yaml'
setattr(args, 'config_dir', None)
setattr(args, 'config_file', config_file_path)
config_managers = build_config_managers_from_yaml(args, config_file_path)
else:
if args.kube_completions :
logging.warning("--kube-completions or -kubecomp specified, however not running in Kubernetes Job completion, check your command line")
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)
config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
mgr = state_manager.StateManager(file_system_root_path=args.config_dir)
config_file_names = mgr.list_validations_in_dir(args.config_dir)

config_managers = []
for file in config_file_names:
config_managers.extend(build_config_managers_from_yaml(args, file))
else:
config_file_path = _get_arg_config_file(args)
config_managers = build_config_managers_from_yaml(args, config_file_path)
Expand Down
16 changes: 2 additions & 14 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,6 @@ def _configure_validation_config_parser(subparsers):
"-cdir",
help="Directory path containing YAML Config Files to be used for running validations.",
)
run_parser.add_argument(
"--kube-completions",
"-kubecomp",
action="store_true",
help="When validating multiple table partitions generated by generate-table-partitions, use this flag to tell Kubernetes",
)

get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
Expand Down Expand Up @@ -1058,14 +1052,8 @@ def get_validation(validation_name, config_dir=None):
mgr = state_manager.StateManager(file_system_root_path=config_dir)
return mgr.get_validation_config(validation_name, config_dir)
else:
if validation_name.startswith("gs://"):
obj_depth = len(validation_name.split("/"))
gcs_prefix = '/'.join(validation_name.split('/')[:obj_depth-1])
mgr = state_manager.StateManager(file_system_root_path=gcs_prefix)
return mgr.get_validation_config(validation_name.split('/')[obj_depth-1],gcs_prefix)
else:
mgr = state_manager.StateManager()
return mgr.get_validation_config(validation_name)
mgr = state_manager.StateManager()
return mgr.get_validation_config(validation_name)


def list_validations():
Expand Down
45 changes: 0 additions & 45 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1144,51 +1144,6 @@ def test_bigquery_generate_table_partitions(mock_conn):
assert partition_filters[0] == EXPECTED_PARTITION_FILTER


# Expected result from partitioning table on 2 keys, of date and timestamp datatype
EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER = [
"cast(col_timestamp as timestamp) < '2023-07-04 09:15:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00+00:00' AND (cast(col_date as timestamp) < '2023-07-04 00:00:00')",
"(cast(col_timestamp as timestamp) > '2023-07-04 09:15:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00+00:00' AND (cast(col_date as timestamp) >= '2023-07-04 00:00:00')) AND (cast(col_timestamp as timestamp) < '2023-07-08 13:20:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00+00:00' AND (cast(col_date as timestamp) < '2023-07-08 00:00:00'))",
"(cast(col_timestamp as timestamp) > '2023-07-08 13:20:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00+00:00' AND (cast(col_date as timestamp) >= '2023-07-08 00:00:00')) AND (cast(col_timestamp as timestamp) < '2023-07-12 12:30:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00+00:00' AND (cast(col_date as timestamp) < '2023-07-12 00:00:00'))",
"cast(col_timestamp as timestamp) > '2023-07-12 12:30:00+00:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00+00:00' AND (cast(col_date as timestamp) >= '2023-07-12 00:00:00')",
]


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=BQ_CONN,
)
def test_generate_table_partitions_date_and_timestamp(mock_conn):
"""Test generate table partitions on BigQuery with date or/and timestamp as primary key.
The unit tests, specifically test_add_partition_filters_to_config and test_store_yaml_partitions_local
check that yaml configurations are created and saved in local storage. Partitions can only be created with
a database that can handle SQL with ntile, hence doing this as part of system testing.
What we are checking
1. the shape of the partition list is 1, number of partitions (only one table in the list)
2. value of the partition list matches what we expect.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"generate-table-partitions",
"-sc=mock-conn",
"-tc=mock-conn",
"-tbls=pso_data_validator.test_generate_partitions_for_date_timestamp=pso_data_validator.test_generate_partitions_for_date_timestamp",
"-pk=col_timestamp,col_date",
"-hash=*",
"-cdir=/home/users/yaml",
"-pn=4",
]
)
config_managers = main.build_config_managers_from_args(args, consts.ROW_VALIDATION)
partition_builder = PartitionBuilder(config_managers, args)
partition_filters = partition_builder._get_partition_key_filters()
assert len(partition_filters) == 1 # only one pair of tables
assert (
len(partition_filters[0]) == partition_builder.args.partition_num
) # assume no of table rows > partition_num
assert partition_filters[0] == EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
return_value=BQ_CONN,
Expand Down
49 changes: 4 additions & 45 deletions tests/system/data_sources/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,51 +148,6 @@ def test_bigquery_generate_table_partitions():
assert partition_filters[0] == EXPECTED_PARTITION_FILTER


EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER = [
"cast(col_timestamp as timestamp) < '2023-07-04 09:15:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00' AND (cast(col_date as timestamp) < '2023-07-04 00:00:00')",
"(cast(col_timestamp as timestamp) > '2023-07-04 09:15:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00' AND (cast(col_date as timestamp) >= '2023-07-04 00:00:00')) AND (cast(col_timestamp as timestamp) < '2023-07-08 13:20:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00' AND (cast(col_date as timestamp) < '2023-07-08 00:00:00'))",
"(cast(col_timestamp as timestamp) > '2023-07-08 13:20:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00' AND (cast(col_date as timestamp) >= '2023-07-08 00:00:00')) AND (cast(col_timestamp as timestamp) < '2023-07-12 12:30:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00' AND (cast(col_date as timestamp) < '2023-07-12 00:00:00'))",
"cast(col_timestamp as timestamp) > '2023-07-12 12:30:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00' AND (cast(col_date as timestamp) >= '2023-07-12 00:00:00')",
]


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_generate_table_partitions_date_and_timestamp():
"""Test generate table partitions on Hive with date or/and timestamp as primary key.
The unit tests, specifically test_add_partition_filters_to_config and test_store_yaml_partitions_local
check that yaml configurations are created and saved in local storage. Partitions can only be created with
a database that can handle SQL with ntile, hence doing this as part of system testing.
What we are checking
1. the shape of the partition list is 1, number of partitions (only one table in the list)
2. value of the partition list matches what we expect.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"generate-table-partitions",
"-sc=hive-conn",
"-tc=hive-conn",
"-tbls=pso_data_validator.test_generate_partitions_for_date_timestamp=pso_data_validator.test_generate_partitions_for_date_timestamp",
"-pk=col_timestamp,col_date",
"-hash=*",
"-cdir=/home/users/yaml",
"-pn=4",
]
)
config_managers = main.build_config_managers_from_args(args, consts.ROW_VALIDATION)
partition_builder = PartitionBuilder(config_managers, args)
partition_filters = partition_builder._get_partition_key_filters()

assert len(partition_filters) == 1 # only one pair of tables
assert (
len(partition_filters[0]) == partition_builder.args.partition_num
) # assume no of table rows > partition_num
assert partition_filters[0] == EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand All @@ -211,6 +166,10 @@ def test_schema_validation_core_types_to_bigquery():
(
# All Hive integrals go to BigQuery INT64.
"--allow-list=int8:int64,int16:int64,int32:int64,"
# Hive decimals that map to BigQuery NUMERIC.
"decimal(20,0):decimal(38,9),decimal(10,2):decimal(38,9),"
# Hive decimals that map to BigQuery BIGNUMERIC.
"decimal(38,0):decimal(76,38),"
# Hive does not have a time zoned
"timestamp:timestamp('UTC'),"
# BigQuery does not have a float32 type.
Expand Down
53 changes: 6 additions & 47 deletions tests/system/data_sources/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ def test_schema_validation_core_types_to_bigquery():
"--exclusion-columns=id",
(
# Teradata integrals go to BigQuery INT64.
"--allow-list=int8:int64,int16:int64,int32:int64"
"--allow-list=int8:int64,int16:int64,int32:int64,"
# Teradata NUMBERS that map to BigQuery NUMERIC.
"decimal(20,0):decimal(38,9),decimal(10,2):decimal(38,9),"
# Teradata NUMBERS that map to BigQuery BIGNUMERIC.
"decimal(38,0):decimal(76,38)"
),
]
)
Expand Down Expand Up @@ -382,7 +386,7 @@ def test_row_validation_core_types():
new=mock_get_connection_config,
)
def test_teradata_generate_table_partitions():
"""Test generate table partitions on Teradata
"""Test generate table partitions on BigQuery
The unit tests, specifically test_add_partition_filters_to_config and test_store_yaml_partitions_local
check that yaml configurations are created and saved in local storage. Partitions can only be created with
a database that can handle SQL with ntile, hence doing this as part of system testing.
Expand Down Expand Up @@ -414,51 +418,6 @@ def test_teradata_generate_table_partitions():
assert partition_filters[0] == EXPECTED_PARTITION_FILTER


# Expected result from partitioning table on 2 keys, of date and timestamp datatype
EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER = [
"cast(col_timestamp as timestamp) < '2023-07-04 09:15:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00' AND (col_date < 2023-07-04)",
"(cast(col_timestamp as timestamp) > '2023-07-04 09:15:00' OR cast(col_timestamp as timestamp) = '2023-07-04 09:15:00' AND (col_date >= 2023-07-04)) AND (cast(col_timestamp as timestamp) < '2023-07-08 13:20:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00' AND (col_date < 2023-07-08))",
"(cast(col_timestamp as timestamp) > '2023-07-08 13:20:00' OR cast(col_timestamp as timestamp) = '2023-07-08 13:20:00' AND (col_date >= 2023-07-08)) AND (cast(col_timestamp as timestamp) < '2023-07-12 12:30:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00' AND (col_date < 2023-07-12))",
"cast(col_timestamp as timestamp) > '2023-07-12 12:30:00' OR cast(col_timestamp as timestamp) = '2023-07-12 12:30:00' AND (col_date >= 2023-07-12)",
]


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
)
def test_generate_table_partitions_date_and_timestamp():
"""Test generate table partitions on Teradata with date or/and timestamp as primary key.
The unit tests, specifically test_add_partition_filters_to_config and test_store_yaml_partitions_local
check that yaml configurations are created and saved in local storage. Partitions can only be created with
a database that can handle SQL with ntile, hence doing this as part of system testing.
What we are checking
1. the shape of the partition list is 1, number of partitions (only one table in the list)
2. value of the partition list matches what we expect.
"""
parser = cli_tools.configure_arg_parser()
args = parser.parse_args(
[
"generate-table-partitions",
"-sc=mock-conn",
"-tc=mock-conn",
"-tbls=udf.test_generate_partitions_for_date_timestamp=udf.test_generate_partitions_for_date_timestamp",
"-pk=col_timestamp,col_date",
"-hash=*",
"-cdir=/home/users/yaml",
"-pn=4",
]
)
config_managers = main.build_config_managers_from_args(args, consts.ROW_VALIDATION)
partition_builder = PartitionBuilder(config_managers, args)
partition_filters = partition_builder._get_partition_key_filters()
assert len(partition_filters) == 1 # only one pair of tables
assert (
len(partition_filters[0]) == partition_builder.args.partition_num
) # assume no of table rows > partition_num
assert partition_filters[0] == EXPECTED_TIMESTAMP_DATE_PARTITION_FILTER


@mock.patch(
"data_validation.state_manager.StateManager.get_connection_config",
new=mock_get_connection_config,
Expand Down

0 comments on commit 0d8296c

Please sign in to comment.