Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: GCS support for validation configs #340

Merged
merged 8 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ case specific CLI arguments or editing the saved YAML configuration file.
For example, the following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`
citibike.yaml`.

The vaildation config file is saved to the GCS path specified by the `PSO_DV_CONFIG_HOME` env variable if that has been set; otherwise, it is saved to wherever the tool is run.

Here is the generated YAML file named `citibike.yaml`:

Expand Down Expand Up @@ -219,6 +221,8 @@ data-validation run-config -c citibike.yaml
View the complete YAML file for a GroupedColumn validation on the
[examples](docs/examples.md#) page.

You can view a list of all saved validation YAML files using `data-validation run-config list`.

### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
Expand Down
27 changes: 14 additions & 13 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import json
from yaml import dump, load, Dumper, Loader

from data_validation import (
cli_tools,
Expand All @@ -37,10 +35,8 @@ def _get_arg_config_file(args):

def _get_yaml_config_from_file(config_file_path):
"""Return Dict of yaml validation data."""
with open(config_file_path, "r") as yaml_file:
yaml_configs = load(yaml_file.read(), Loader=Loader)

return yaml_configs
yaml_config = cli_tools.get_validation(config_file_path)
return yaml_config


def get_aggregate_config(args, config_manager):
Expand Down Expand Up @@ -321,12 +317,9 @@ def store_yaml_config_file(args, config_managers):
Args:
config_managers (list[ConfigManager]): List of config manager instances.
"""
config_file_path = _get_arg_config_file(args)
yaml_configs = convert_config_to_yaml(args, config_managers)
yaml_config_str = dump(yaml_configs, Dumper=Dumper)

with open(config_file_path, "w") as yaml_file:
yaml_file.write(yaml_config_str)
config_file_path = _get_arg_config_file(args)
cli_tools.store_validation(config_file_path, yaml_configs)


def run(args):
Expand All @@ -352,6 +345,15 @@ def run_connections(args):
raise ValueError(f"Connections Argument '{args.connect_cmd}' is not supported")


def run_validation_config(args):
""" Run commands related to validation config YAMLs."""
if args.run_config_cmd == "list":
cli_tools.list_validations()
else:
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)


def validate(args):
""" Run commands related to data validation."""
if args.validate_cmd == "column" or args.validate_cmd == "schema":
Expand All @@ -369,8 +371,7 @@ def main():
elif args.command == "connections":
run_connections(args)
elif args.command == "run-config":
config_managers = build_config_managers_from_yaml(args)
run_validations(args, config_managers)
run_validation_config(args)
elif args.command == "find-tables":
print(find_tables_using_string_matching(args))
elif args.command == "query":
Expand Down
29 changes: 24 additions & 5 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""The Data Validation CLI tool is intended to help to build and execute
data validation runs with ease.

Expand Down Expand Up @@ -53,7 +51,6 @@
from data_validation import consts
from data_validation import state_manager


CONNECTION_SOURCE_FIELDS = {
"BigQuery": [
["project_id", "GCP Project to use for BigQuery"],
Expand Down Expand Up @@ -204,6 +201,9 @@ def _configure_run_config_parser(subparsers):
run_config_parser = subparsers.add_parser(
"run-config", help="Run validations stored in a YAML config file"
)
run_config_subparsers = run_config_parser.add_subparsers(dest="run_config_cmd")
_ = run_config_subparsers.add_parser("list", help="List your validation configs")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without having tested this, using an optional command parser is a slightly strange feature (and I'm not sure its doable?).

The naming structure itself here is slightly strange as well (eg run-config list is not easy to find and understand)

It seems to me that the goal here is to create a configs CLI section like we have with connections. If thats the case perhaps do so more explicitly.
configs list & configs run & also maybe add configs get though only a nice to have.

For backwards compat you'll need to keep run-config though perhaps it can be a hidden config going forward?

Copy link
Member Author

@dmedora dmedora Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, run-config list is a little strange. I was trying to stick with run-config, but adding a configs makes sense. Will amend.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the configs command (run, list, get), and left run-config in place. PTAL when you have a chance!


run_config_parser.add_argument(
"--config-file",
"-c",
Expand Down Expand Up @@ -494,7 +494,6 @@ def threshold_float(x):
# os.makedirs(dir_path)
# return dir_path


# def _get_connection_file(connection_name):
# dir_path = _get_data_validation_directory()
# file_name = f"{connection_name}.connection.json"
Expand All @@ -518,7 +517,6 @@ def store_connection(connection_name, conn):
# with open(file_path, "w") as file:
# file.write(json.dumps(conn))


# def get_connections():
# """ Return dict with connection name and path key pairs."""
# connections = {}
Expand Down Expand Up @@ -555,6 +553,27 @@ def get_connection(connection_name):
# return json.loads(conn_str)


def store_validation(validation_file_name, yaml_config):
""" Store the validation YAML config under the given name."""
mgr = state_manager.StateManager()
mgr.create_validation_yaml(validation_file_name, yaml_config)


def get_validation(validation_name):
""" Return validation YAML for a specific connection."""
mgr = state_manager.StateManager()
return mgr.get_validation_config(validation_name)


def list_validations():
""" List all saved validation YAMLs."""
mgr = state_manager.StateManager()
validations = mgr.list_validations()

for validation_name in validations:
print(f"Validation YAML name: {validation_name}")


def get_labels(arg_labels):
""" Return list of tuples representing key-value label pairs. """
labels = []
Expand Down
57 changes: 53 additions & 4 deletions data_validation/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
from google.cloud import storage
from typing import Dict, List
from yaml import dump, load, Dumper, Loader

from data_validation import client_info
from data_validation import consts
Expand Down Expand Up @@ -93,13 +94,59 @@ def _get_connections_directory(self) -> str:
def _get_connection_path(self, name: str) -> str:
"""Returns the full path to a connection.

Args:
name: The name of the connection.
"""
Args:
name: The name of the connection.
"""
return os.path.join(
self._get_connections_directory(), f"{name}.connection.json"
)

def create_validation_yaml(self, name: str, yaml_config: Dict[str, str]):
"""Create a validation file and store the given config as YAML.

Args:
name (String): The name of the validation.
yaml_config (Dict): A dictionary with the validation details.
"""
validation_path = self._get_validation_path(name)
yaml_config_str = dump(yaml_config, Dumper=Dumper)
self._write_file(validation_path, yaml_config_str)

def get_validation_config(self, name: str) -> Dict[str, str]:
"""Get a validation configuration from the expected file.

Args:
name: The name of the validation.
Returns:
A dict of the validation values from the file.
"""
validation_path = self._get_validation_path(name)
validation_bytes = self._read_file(validation_path)
return load(validation_bytes, Loader=Loader)

def list_validations(self):
file_names = self._list_directory(self._get_validations_directory())
return [
file_name.split(".")[0]
for file_name in file_names
if file_name.endswith(".yaml")
]

def _get_validations_directory(self):
"""Returns the validations directory path."""
if self.file_system == FileSystem.LOCAL:
# Validation configs should be written to tool root dir, not consts.DEFAULT_ENV_DIRECTORY as connections are
return "./"
return os.path.join(self.file_system_root_path, "validations/")

def _get_validation_path(self, name: str) -> str:
"""Returns the full path to a validation.

Args:
name: The name of the validation.
"""
return os.path.join(self._get_validations_directory(), f"{name}")

def _read_file(self, file_path: str) -> str:
if self.file_system == FileSystem.GCS:
return self._read_gcs_file(file_path)
Expand All @@ -113,6 +160,8 @@ def _write_file(self, file_path: str, data: str):
with open(file_path, "w") as file:
file.write(data)

print("Success! Config output written to {}".format(file_path))
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved

def _list_directory(self, directory_path: str) -> List[str]:
if self.file_system == FileSystem.GCS:
return self._list_gcs_directory(directory_path)
Expand Down Expand Up @@ -154,7 +203,7 @@ def _read_gcs_file(self, file_path: str) -> str:
gcs_file_path = self._get_gcs_file_path(file_path)
blob = self.gcs_bucket.get_blob(gcs_file_path)

return blob.download_as_string()
return blob.download_as_bytes()
nehanene15 marked this conversation as resolved.
Show resolved Hide resolved

def _write_gcs_file(self, file_path: str, data: str):
gcs_file_path = self._get_gcs_file_path(file_path)
Expand Down
2 changes: 1 addition & 1 deletion docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ a directory specified by the env variable `PSO_DV_CONFIG_HOME`.
## GCS Connection Management (recommended)

The connections can also be stored in GCS using `PSO_DV_CONFIG_HOME`.
To do so simply add the GCS path to the environment.
To do so simply add the GCS path to the environment. Note that if this path is set, query validation configs will also be saved here.

eg.
`export PSO_DV_CONFIG_HOME=gs://my-bucket/my/connections/path/`
Expand Down
47 changes: 43 additions & 4 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@

import os

from data_validation import cli_tools, consts, data_validation
from data_validation import cli_tools, consts, data_validation, state_manager
from data_validation import __main__ as main


PROJECT_ID = os.environ["PROJECT_ID"]
os.environ[consts.ENV_DIRECTORY_VAR] = f"gs://{PROJECT_ID}/integration_tests/"
BQ_CONN = {"source_type": "BigQuery", "project_id": PROJECT_ID}
Expand Down Expand Up @@ -167,6 +166,7 @@
"--config-file",
CLI_CONFIG_FILE,
]
EXPECTED_NUM_YAML_LINES = 33 # Expected number of lines for validation config geenrated by CLI_STORE_COLUMN_ARGS
CLI_RUN_CONFIG_ARGS = ["run-config", "--config-file", CLI_CONFIG_FILE]

CLI_FIND_TABLES_ARGS = [
Expand Down Expand Up @@ -237,7 +237,43 @@ def test_numeric_types():
)


def test_cli_store_yaml_then_run():
def test_cli_store_yaml_then_run_gcs():
"""Test storing and retrieving validation YAML when GCS env var is set."""
# Store BQ Connection
_store_bq_conn()

# Build validation and store to file
parser = cli_tools.configure_arg_parser()
mock_args = parser.parse_args(CLI_STORE_COLUMN_ARGS)
main.run(mock_args)

# Look for YAML file in GCS env directory, since that has been set
yaml_file_path = os.path.join(
os.environ[consts.ENV_DIRECTORY_VAR], "validations/", CLI_CONFIG_FILE
)

# The number of lines is not significant, except that it represents
# the exact file expected to be created. Any change to this value
# is likely to be a breaking change and must be assessed.
mgr = state_manager.StateManager()
validation_bytes = mgr._read_file(yaml_file_path)
yaml_file_str = validation_bytes.decode("utf-8")
assert len(yaml_file_str.splitlines()) == EXPECTED_NUM_YAML_LINES

# Run generated config
run_config_args = parser.parse_args(CLI_RUN_CONFIG_ARGS)
config_managers = main.build_config_managers_from_yaml(run_config_args)
main.run_validations(run_config_args, config_managers)

# _remove_bq_conn()


def test_cli_store_yaml_then_run_local():
"""Test storing and retrieving validation YAML when GCS env var not set."""
# Unset GCS env var so that YAML is saved locally
gcs_path = os.environ[consts.ENV_DIRECTORY_VAR]
os.environ[consts.ENV_DIRECTORY_VAR] = ""

# Store BQ Connection
_store_bq_conn()

Expand All @@ -251,7 +287,7 @@ def test_cli_store_yaml_then_run():
# The number of lines is not significant, except that it represents
# the exact file expected to be created. Any change to this value
# is likely to be a breaking change and must be assessed.
assert len(yaml_file.readlines()) == 33
assert len(yaml_file.readlines()) == EXPECTED_NUM_YAML_LINES

# Run generated config
run_config_args = parser.parse_args(CLI_RUN_CONFIG_ARGS)
Expand All @@ -261,6 +297,9 @@ def test_cli_store_yaml_then_run():
os.remove(yaml_file_path)
# _remove_bq_conn()

# Re-set GCS env var
os.environ[consts.ENV_DIRECTORY_VAR] = gcs_path


def test_cli_find_tables():
_store_bq_conn()
Expand Down
Loading