Skip to content

Commit

Permalink
feat: GCS support for validation configs (#340)
Browse files Browse the repository at this point in the history
* gcs support for validation configs, incl. get and list functionality, and new 'configs' cmd
  • Loading branch information
dmedora committed Feb 17, 2022
1 parent 100b3ea commit b09cd29
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 48 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ case specific CLI arguments or editing the saved YAML configuration file.
For example, the following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`
citibike.yaml`.

The vaildation config file is saved to the GCS path specified by the `PSO_DV_CONFIG_HOME` env variable if that has been set; otherwise, it is saved to wherever the tool is run.

Here is the generated YAML file named `citibike.yaml`:

Expand Down Expand Up @@ -233,12 +235,14 @@ Once the file is updated and saved, the following command runs the new
validation:

```
data-validation run-config -c citibike.yaml
data-validation configs run -c citibike.yaml
```

View the complete YAML file for a GroupedColumn validation on the
[examples](docs/examples.md#) page.

You can view a list of all saved validation YAML files using `data-validation configs list`, and print a YAML config using `data-validation configs get -c citibike.yaml`.

### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
Expand Down
47 changes: 33 additions & 14 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import logging
import json
from yaml import dump, load, Dumper, Loader

from data_validation import (
cli_tools,
Expand All @@ -28,6 +27,9 @@
from data_validation.config_manager import ConfigManager
from data_validation.data_validation import DataValidation

from yaml import dump
import sys


def _get_arg_config_file(args):
"""Return String yaml config file path."""
Expand All @@ -39,10 +41,8 @@ def _get_arg_config_file(args):

def _get_yaml_config_from_file(config_file_path):
"""Return Dict of yaml validation data."""
with open(config_file_path, "r") as yaml_file:
yaml_configs = load(yaml_file.read(), Loader=Loader)

return yaml_configs
yaml_config = cli_tools.get_validation(config_file_path)
return yaml_config


def get_aggregate_config(args, config_manager):
Expand Down Expand Up @@ -336,12 +336,9 @@ def store_yaml_config_file(args, config_managers):
Args:
config_managers (list[ConfigManager]): List of config manager instances.
"""
config_file_path = _get_arg_config_file(args)
yaml_configs = convert_config_to_yaml(args, config_managers)
yaml_config_str = dump(yaml_configs, Dumper=Dumper)

with open(config_file_path, "w") as yaml_file:
yaml_file.write(yaml_config_str)
config_file_path = _get_arg_config_file(args)
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
Expand All @@ -355,7 +352,7 @@ def run(args):


def run_connections(args):
""" Run commands related to connection management."""
"""Run commands related to connection management."""
if args.connect_cmd == "list":
cli_tools.list_connections()
elif args.connect_cmd == "add":
Expand All @@ -367,8 +364,29 @@ def run_connections(args):
raise ValueError(f"Connections Argument '{args.connect_cmd}' is not supported")


def run_config(args):
"""Run commands related to validation config YAMLs (legacy - superceded by run_validation_configs)."""
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)


def run_validation_configs(args):
"""Run commands related to validation config YAMLs."""
if args.validation_config_cmd == "run":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
elif args.validation_config_cmd == "list":
cli_tools.list_validations()
elif args.validation_config_cmd == "get":
# Get and print yaml file config.
yaml = cli_tools.get_validation(_get_arg_config_file(args))
dump(yaml, sys.stdout)
else:
raise ValueError(f"Configs argument '{args.validate_cmd}' is not supported")


def validate(args):
""" Run commands related to data validation."""
"""Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
run(args)
else:
Expand All @@ -384,8 +402,9 @@ def main():
elif args.command == "connections":
run_connections(args)
elif args.command == "run-config":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
run_config(args)
elif args.command == "configs":
run_validation_configs(args)
elif args.command == "find-tables":
print(find_tables_using_string_matching(args))
elif args.command == "query":
Expand Down
82 changes: 65 additions & 17 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""The Data Validation CLI tool is intended to help to build and execute
data validation runs with ease.
Expand Down Expand Up @@ -53,7 +51,6 @@
from data_validation import consts
from data_validation import state_manager


CONNECTION_SOURCE_FIELDS = {
"BigQuery": [
["project_id", "GCP Project to use for BigQuery"],
Expand Down Expand Up @@ -133,7 +130,7 @@


def get_parsed_args():
""" Return ArgParser with configured CLI arguments."""
"""Return ArgParser with configured CLI arguments."""
parser = configure_arg_parser()
return parser.parse_args()

Expand All @@ -149,6 +146,7 @@ def configure_arg_parser():
subparsers = parser.add_subparsers(dest="command")
_configure_validate_parser(subparsers)
_configure_run_config_parser(subparsers)
_configure_validation_config_parser(subparsers)
_configure_connection_parser(subparsers)
_configure_find_tables(subparsers)
_configure_raw_query(subparsers)
Expand Down Expand Up @@ -209,19 +207,49 @@ def _configure_raw_query(subparsers):


def _configure_run_config_parser(subparsers):
""" Configure arguments to run a data validation YAML config."""
"""Configure arguments to run a data validation YAML config using the legacy run-config command."""
run_config_parser = subparsers.add_parser(
"run-config", help="Run validations stored in a YAML config file"
"run-config",
help="Run validations stored in a YAML config file. Note: the 'configs run' command is now the recommended approach",
)

run_config_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)


def _configure_validation_config_parser(subparsers):
"""Configure arguments to run a data validation YAML config."""
validation_config_parser = subparsers.add_parser(
"configs", help="Run validations stored in a YAML config file"
)
configs_subparsers = validation_config_parser.add_subparsers(
dest="validation_config_cmd"
)
_ = configs_subparsers.add_parser("list", help="List your validation configs")
run_parser = configs_subparsers.add_parser(
"run", help="Run your validation configs"
)
run_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)

get_parser = configs_subparsers.add_parser(
"get", help="Get and print a validation config"
)
get_parser.add_argument(
"--config-file",
"-c",
help="YAML Config File Path to be used for building or running validations.",
)


def _configure_run_parser(subparsers):
""" Configure arguments to run a data validation."""
"""Configure arguments to run a data validation."""

# subparsers = parser.add_subparsers(dest="command")

Expand Down Expand Up @@ -327,7 +355,7 @@ def _configure_run_parser(subparsers):


def _configure_connection_parser(subparsers):
""" Configure the Parser for Connection Management. """
"""Configure the Parser for Connection Management."""
connection_parser = subparsers.add_parser(
"connections", help="Manage & Store connections to your Databases"
)
Expand Down Expand Up @@ -487,7 +515,7 @@ def _add_common_arguments(parser):


def get_connection_config_from_args(args):
""" Return dict with connection config supplied."""
"""Return dict with connection config supplied."""
config = {consts.SOURCE_TYPE: args.connect_type}

if args.connect_type == "Raw":
Expand Down Expand Up @@ -525,7 +553,6 @@ def threshold_float(x):
# os.makedirs(dir_path)
# return dir_path


# def _get_connection_file(connection_name):
# dir_path = _get_data_validation_directory()
# file_name = f"{connection_name}.connection.json"
Expand All @@ -538,7 +565,7 @@ def _generate_random_name(conn):


def store_connection(connection_name, conn):
""" Store the connection config under the given name."""
"""Store the connection config under the given name."""
mgr = state_manager.StateManager()
mgr.create_connection(connection_name, conn)

Expand All @@ -549,7 +576,6 @@ def store_connection(connection_name, conn):
# with open(file_path, "w") as file:
# file.write(json.dumps(conn))


# def get_connections():
# """ Return dict with connection name and path key pairs."""
# connections = {}
Expand All @@ -567,7 +593,7 @@ def store_connection(connection_name, conn):


def list_connections():
""" List all saved connections."""
"""List all saved connections."""
mgr = state_manager.StateManager()
connections = mgr.list_connections()

Expand All @@ -576,7 +602,7 @@ def list_connections():


def get_connection(connection_name):
""" Return dict connection details for a specific connection."""
"""Return dict connection details for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_connection_config(connection_name)

Expand All @@ -586,8 +612,30 @@ def get_connection(connection_name):
# return json.loads(conn_str)


def store_validation(validation_file_name, yaml_config):
"""Store the validation YAML config under the given name."""
mgr = state_manager.StateManager()
mgr.create_validation_yaml(validation_file_name, yaml_config)


def get_validation(validation_name):
"""Return validation YAML for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_validation_config(validation_name)


def list_validations():
"""List all saved validation YAMLs."""
mgr = state_manager.StateManager()
validations = mgr.list_validations()

print("Validation YAMLs found:")
for validation_name in validations:
print(f"{validation_name}.yaml")


def get_labels(arg_labels):
""" Return list of tuples representing key-value label pairs. """
"""Return list of tuples representing key-value label pairs."""
labels = []
if arg_labels:
pairs = arg_labels.split(",")
Expand Down Expand Up @@ -672,7 +720,7 @@ def get_arg_list(arg_value, default_value=None):


def get_tables_list(arg_tables, default_value=None, is_filesystem=False):
""" Returns dictionary of tables. Backwards compatible for JSON input.
"""Returns dictionary of tables. Backwards compatible for JSON input.
arg_table (str): tables_list argument specified
default_value (Any): A default value to supply when arg_value is empty.
Expand Down Expand Up @@ -728,7 +776,7 @@ def get_tables_list(arg_tables, default_value=None, is_filesystem=False):


def split_table(table_ref, schema_required=True):
""" Returns schema and table name given list of input values.
"""Returns schema and table name given list of input values.
table_ref (List): Table reference i.e ['my.schema.my_table']
schema_required (boolean): Indicates whether schema is required. A source
Expand Down
Loading

0 comments on commit b09cd29

Please sign in to comment.