Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GCS support for validation configs #340

Merged
merged 8 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ case specific CLI arguments or editing the saved YAML configuration file.
For example, the following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`
citibike.yaml`.

The vaildation config file is saved to the GCS path specified by the `PSO_DV_CONFIG_HOME` env variable if that has been set; otherwise, it is saved to wherever the tool is run.

Here is the generated YAML file named `citibike.yaml`:

Expand Down Expand Up @@ -233,12 +235,14 @@ Once the file is updated and saved, the following command runs the new
validation:

```
data-validation run-config -c citibike.yaml
data-validation configs run -c citibike.yaml
```

View the complete YAML file for a GroupedColumn validation on the
[examples](docs/examples.md#) page.

You can view a list of all saved validation YAML files using `data-validation configs list`, and print a YAML config using `data-validation configs get -c citibike.yaml`.

### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
Expand Down
47 changes: 33 additions & 14 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import logging
import json
from yaml import dump, load, Dumper, Loader

from data_validation import (
cli_tools,
Expand All @@ -28,6 +27,9 @@
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation

from yaml import dump
import sys


def _get_arg_config_file(args):
"""Return String yaml config file path."""
Expand All @@ -39,10 +41,8 @@ def _get_arg_config_file(args):

def _get_yaml_config_from_file(config_file_path):
"""Return Dict of yaml validation data."""
with open(config_file_path, "r") as yaml_file:
yaml_configs = load(yaml_file.read(), Loader=Loader)

return yaml_configs
yaml_config = cli_tools.get_validation(config_file_path)
return yaml_config


def get_aggregate_config(args, config_manager):
Expand Down Expand Up @@ -336,12 +336,9 @@ def store_yaml_config_file(args, config_managers):
Args:
config_managers (list[ConfigManager]): List of config manager instances.
"""
config_file_path = _get_arg_config_file(args)
yaml_configs = convert_config_to_yaml(args, config_managers)
yaml_config_str = dump(yaml_configs, Dumper=Dumper)

with open(config_file_path, "w") as yaml_file:
yaml_file.write(yaml_config_str)
config_file_path = _get_arg_config_file(args)
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
Expand All @@ -355,7 +352,7 @@ def run(args):


def run_connections(args):
""" Run commands related to connection management."""
"""Run commands related to connection management."""
if args.connect_cmd == "list":
cli_tools.list_connections()
elif args.connect_cmd == "add":
Expand All @@ -367,8 +364,29 @@ def run_connections(args):
raise ValueError(f"Connections Argument '{args.connect_cmd}' is not supported")


def run_config(args):
"""Run commands related to validation config YAMLs (legacy - superceded by run_validation_configs)."""
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)


def run_validation_configs(args):
"""Run commands related to validation config YAMLs."""
if args.validation_config_cmd == "run":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
elif args.validation_config_cmd == "list":
cli_tools.list_validations()
elif args.validation_config_cmd == "get":
# Get and print yaml file config.
yaml = cli_tools.get_validation(_get_arg_config_file(args))
dump(yaml, sys.stdout)
else:
raise ValueError(f"Configs argument '{args.validate_cmd}' is not supported")


def validate(args):
""" Run commands related to data validation."""
"""Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
run(args)
else:
Expand All @@ -384,8 +402,9 @@ def main():
elif args.command == "connections":
run_connections(args)
elif args.command == "run-config":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
run_config(args)
elif args.command == "configs":
run_validation_configs(args)
elif args.command == "find-tables":
print(find_tables_using_string_matching(args))
elif args.command == "query":
Expand Down
82 changes: 65 additions & 17 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""The Data Validation CLI tool is intended to help to build and execute
data validation runs with ease.

Expand Down Expand Up @@ -53,7 +51,6 @@
from data_validation import consts
from data_validation import state_manager


CONNECTION_SOURCE_FIELDS = {
"BigQuery": [
["project_id", "GCP Project to use for BigQuery"],
Expand Down Expand Up @@ -133,7 +130,7 @@


def get_parsed_args():
""" Return ArgParser with configured CLI arguments."""
"""Return ArgParser with configured CLI arguments."""
parser = configure_arg_parser()
return parser.parse_args()

Expand All @@ -149,6 +146,7 @@ def configure_arg_parser():
subparsers = parser.add_subparsers(dest="command")
_configure_validate_parser(subparsers)
_configure_run_config_parser(subparsers)
_configure_validation_config_parser(subparsers)
_configure_connection_parser(subparsers)
_configure_find_tables(subparsers)
_configure_raw_query(subparsers)
Expand Down Expand Up @@ -209,19 +207,49 @@ def _configure_raw_query(subparsers):


def _configure_run_config_parser(subparsers):
""" Configure arguments to run a data validation YAML config."""
"""Configure arguments to run a data validation YAML config using the legacy run-config command."""
run_config_parser = subparsers.add_parser(
"run-config", help="Run validations stored in a YAML config file"
"run-config",
help="Run validations stored in a YAML config file. Note: the 'configs run' command is now the recommended approach",
)

run_config_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)


def _configure_validation_config_parser(subparsers):
"""Configure arguments to run a data validation YAML config."""
validation_config_parser = subparsers.add_parser(
"configs", help="Run validations stored in a YAML config file"
)
configs_subparsers = validation_config_parser.add_subparsers(
dest="validation_config_cmd"
)
_ = configs_subparsers.add_parser("list", help="List your validation configs")
run_parser = configs_subparsers.add_parser(
"run", help="Run your validation configs"
)
run_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)

get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
)
get_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)


def _configure_run_parser(subparsers):
""" Configure arguments to run a data validation."""
"""Configure arguments to run a data validation."""

# subparsers = parser.add_subparsers(dest="command")

Expand Down Expand Up @@ -327,7 +355,7 @@ def _configure_run_parser(subparsers):


def _configure_connection_parser(subparsers):
""" Configure the Parser for Connection Management. """
"""Configure the Parser for Connection Management."""
connection_parser = subparsers.add_parser(
"connections", help="Manage & Store connections to your Databases"
)
Expand Down Expand Up @@ -487,7 +515,7 @@ def _add_common_arguments(parser):


def get_connection_config_from_args(args):
""" Return dict with connection config supplied."""
"""Return dict with connection config supplied."""
config = {consts.SOURCE_TYPE: args.connect_type}

if args.connect_type == "Raw":
Expand Down Expand Up @@ -525,7 +553,6 @@ def threshold_float(x):
# os.makedirs(dir_path)
# return dir_path


# def _get_connection_file(connection_name):
# dir_path = _get_data_validation_directory()
# file_name = f"{connection_name}.connection.json"
Expand All @@ -538,7 +565,7 @@ def _generate_random_name(conn):


def store_connection(connection_name, conn):
""" Store the connection config under the given name."""
"""Store the connection config under the given name."""
mgr = state_manager.StateManager()
mgr.create_connection(connection_name, conn)

Expand All @@ -549,7 +576,6 @@ def store_connection(connection_name, conn):
# with open(file_path, "w") as file:
# file.write(json.dumps(conn))


# def get_connections():
# """ Return dict with connection name and path key pairs."""
# connections = {}
Expand All @@ -567,7 +593,7 @@ def store_connection(connection_name, conn):


def list_connections():
""" List all saved connections."""
"""List all saved connections."""
mgr = state_manager.StateManager()
connections = mgr.list_connections()

Expand All @@ -576,7 +602,7 @@ def list_connections():


def get_connection(connection_name):
""" Return dict connection details for a specific connection."""
"""Return dict connection details for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_connection_config(connection_name)

Expand All @@ -586,8 +612,30 @@ def get_connection(connection_name):
# return json.loads(conn_str)


def store_validation(validation_file_name, yaml_config):
"""Store the validation YAML config under the given name."""
mgr = state_manager.StateManager()
mgr.create_validation_yaml(validation_file_name, yaml_config)


def get_validation(validation_name):
"""Return validation YAML for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_validation_config(validation_name)


def list_validations():
"""List all saved validation YAMLs."""
mgr = state_manager.StateManager()
validations = mgr.list_validations()

print("Validation YAMLs found:")
for validation_name in validations:
print(f"{validation_name}.yaml")


def get_labels(arg_labels):
""" Return list of tuples representing key-value label pairs. """
"""Return list of tuples representing key-value label pairs."""
labels = []
if arg_labels:
pairs = arg_labels.split(",")
Expand Down Expand Up @@ -672,7 +720,7 @@ def get_arg_list(arg_value, default_value=None):


def get_tables_list(arg_tables, default_value=None, is_filesystem=False):
""" Returns dictionary of tables. Backwards compatible for JSON input.
"""Returns dictionary of tables. Backwards compatible for JSON input.

arg_table (str): tables_list argument specified
default_value (Any): A default value to supply when arg_value is empty.
Expand Down Expand Up @@ -728,7 +776,7 @@ def get_tables_list(arg_tables, default_value=None, is_filesystem=False):


def split_table(table_ref, schema_required=True):
""" Returns schema and table name given list of input values.
"""Returns schema and table name given list of input values.

table_ref (List): Table reference i.e ['my.schema.my_table']
schema_required (boolean): Indicates whether schema is required. A source
Expand Down
Loading