Skip to content

Commit

Permalink
docs updates, typing fix, system test update
Browse files Browse the repository at this point in the history
  • Loading branch information
dmedora committed Nov 2, 2021
1 parent 8aad1bf commit a898069
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 45 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ case specific CLI arguments or editing the saved YAML configuration file.
For example, the following command creates a YAML file for the validation of the
`new_york_citibike` table: `data-validation validate column -sc my_bq_conn -tc
my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips -c
citibike.yaml`
citibike.yaml`.

The vaildation config file is saved to the GCS path specified by the `PSO_DV_CONFIG_HOME` env variable if that has been set; otherwise, it is saved to wherever the tool is run.

Here is the generated YAML file named `citibike.yaml`:

Expand Down Expand Up @@ -219,6 +221,8 @@ data-validation run-config -c citibike.yaml
View the complete YAML file for a GroupedColumn validation on the
[examples](docs/examples.md#) page.

You can view a list of all saved validation YAML files using `data-validation run-config list`.

### Aggregated Fields

Aggregate fields contain the SQL fields that you want to produce an aggregate
Expand Down
4 changes: 2 additions & 2 deletions data_validation/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _get_connection_path(self, name: str) -> str:
return os.path.join(self._get_connections_directory(),
f"{name}.connection.json")

def create_validation_yaml(self, name: str, yaml_config: dict[str, str]):
def create_validation_yaml(self, name: str, yaml_config: Dict[str, str]):
"""Create a validation file and store the given config as YAML.
Args:
Expand All @@ -111,7 +111,7 @@ def create_validation_yaml(self, name: str, yaml_config: dict[str, str]):
self._write_file(validation_path, yaml_config_str)
print(yaml_config)

def get_validation_config(self, name: str) -> dict[str, str]:
def get_validation_config(self, name: str) -> Dict[str, str]:
"""Get a validation configuration from the expected file.
Args:
Expand Down
2 changes: 1 addition & 1 deletion docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ a directory specified by the env variable `PSO_DV_CONFIG_HOME`.
## GCS Connection Management (recommended)

The connections can also be stored in GCS using `PSO_DV_CONFIG_HOME`.
To do so simply add the GCS path to the environment.
To do so simply add the GCS path to the environment. Note that if this path is set, query validation configs will also be saved here.

eg.
`export PSO_DV_CONFIG_HOME=gs://my-bucket/my/connections/path/`
Expand Down
99 changes: 58 additions & 41 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,23 @@
from data_validation import cli_tools, consts, data_validation
from data_validation import __main__ as main


PROJECT_ID = os.environ["PROJECT_ID"]
os.environ[consts.ENV_DIRECTORY_VAR] = f"gs://{PROJECT_ID}/integration_tests/"
BQ_CONN = {"source_type": "BigQuery", "project_id": PROJECT_ID}
CONFIG_COUNT_VALID = {
# BigQuery Specific Connection Name
consts.CONFIG_SOURCE_CONN: BQ_CONN,
consts.CONFIG_TARGET_CONN: BQ_CONN,
consts.CONFIG_SOURCE_CONN:
BQ_CONN,
consts.CONFIG_TARGET_CONN:
BQ_CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
consts.CONFIG_TYPE:
"Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME: "citibike_trips",
consts.CONFIG_SCHEMA_NAME:
"bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME:
"citibike_trips",
consts.CONFIG_GROUPED_COLUMNS: [],
consts.CONFIG_AGGREGATES: [
{
Expand Down Expand Up @@ -63,18 +67,24 @@
consts.CONFIG_FIELD_ALIAS: "min_birth_year",
},
],
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FORMAT:
"table",
}

CONFIG_GROUPED_COUNT_VALID = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: BQ_CONN,
consts.CONFIG_TARGET_CONN: BQ_CONN,
consts.CONFIG_SOURCE_CONN:
BQ_CONN,
consts.CONFIG_TARGET_CONN:
BQ_CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
consts.CONFIG_TYPE:
"Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME: "citibike_trips",
consts.CONFIG_SCHEMA_NAME:
"bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME:
"citibike_trips",
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
Expand All @@ -97,19 +107,25 @@
consts.CONFIG_CAST: "date",
},
],
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FORMAT:
"table",
}

# TODO: The definition for this table is stored in: ./tests/resources/
CONFIG_NUMERIC_AGG_VALID = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: BQ_CONN,
consts.CONFIG_TARGET_CONN: BQ_CONN,
consts.CONFIG_SOURCE_CONN:
BQ_CONN,
consts.CONFIG_TARGET_CONN:
BQ_CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
consts.CONFIG_TYPE:
"Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "pso_data_validator",
consts.CONFIG_TABLE_NAME: "test_data_types",
consts.CONFIG_SCHEMA_NAME:
"pso_data_validator",
consts.CONFIG_TABLE_NAME:
"test_data_types",
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
Expand All @@ -131,7 +147,8 @@
},
],
consts.CONFIG_GROUPED_COLUMNS: [],
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FORMAT:
"table",
}

BQ_CONN_NAME = "bq-integration-test"
Expand Down Expand Up @@ -183,36 +200,34 @@


def test_count_validator():
validator = data_validation.DataValidation(CONFIG_COUNT_VALID, verbose=True)
validator = data_validation.DataValidation(CONFIG_COUNT_VALID,
verbose=True)
df = validator.execute()

count_value = df[df["validation_name"] == "count"]["source_agg_value"].values[0]
count_tripduration_value = df[df["validation_name"] == "count_tripduration"][
"source_agg_value"
].values[0]
count_value = df[df["validation_name"] ==
"count"]["source_agg_value"].values[0]
count_tripduration_value = df[
df["validation_name"] ==
"count_tripduration"]["source_agg_value"].values[0]
avg_tripduration_value = df[df["validation_name"] == "avg_tripduration"][
"source_agg_value"
].values[0]
max_birth_year_value = df[df["validation_name"] == "max_birth_year"][
"source_agg_value"
].values[0]
min_birth_year_value = df[df["validation_name"] == "min_birth_year"][
"source_agg_value"
].values[0]
"source_agg_value"].values[0]
max_birth_year_value = df[df["validation_name"] ==
"max_birth_year"]["source_agg_value"].values[0]
min_birth_year_value = df[df["validation_name"] ==
"min_birth_year"]["source_agg_value"].values[0]

assert float(count_value) > 0
assert float(count_tripduration_value) > 0
assert float(avg_tripduration_value) > 0
assert float(max_birth_year_value) > 0
assert float(min_birth_year_value) > 0
assert (
df["source_agg_value"].astype(float).sum()
== df["target_agg_value"].astype(float).sum()
)
assert (df["source_agg_value"].astype(float).sum() ==
df["target_agg_value"].astype(float).sum())


def test_grouped_count_validator():
validator = data_validation.DataValidation(CONFIG_GROUPED_COUNT_VALID, verbose=True)
validator = data_validation.DataValidation(CONFIG_GROUPED_COUNT_VALID,
verbose=True)
df = validator.execute()
rows = list(df[df["validation_name"] == "count"].iterrows())

Expand All @@ -228,13 +243,13 @@ def test_grouped_count_validator():


def test_numeric_types():
validator = data_validation.DataValidation(CONFIG_NUMERIC_AGG_VALID, verbose=True)
validator = data_validation.DataValidation(CONFIG_NUMERIC_AGG_VALID,
verbose=True)
df = validator.execute()

for validation in df.to_dict(orient="records"):
assert float(validation["source_agg_value"]) == float(
validation["target_agg_value"]
)
validation["target_agg_value"])


def test_cli_store_yaml_then_run():
Expand All @@ -246,7 +261,9 @@ def test_cli_store_yaml_then_run():
mock_args = parser.parse_args(CLI_STORE_COLUMN_ARGS)
main.run(mock_args)

yaml_file_path = CLI_CONFIG_FILE
# Look for YAML file in GCS env directory, since that has been set
yaml_file_path = os.path.join(os.environ[consts.ENV_DIRECTORY_VAR],
"validations/", CLI_CONFIG_FILE)
with open(yaml_file_path, "r") as yaml_file:
# The number of lines is not significant, except that it represents
# the exact file expected to be created. Any change to this value
Expand Down

0 comments on commit a898069

Please sign in to comment.