Skip to content

Commit

Permalink
Merge branch 'GoogleCloudPlatform:develop' into issue377-consistent-s…
Browse files Browse the repository at this point in the history
…tatus-values
  • Loading branch information
ajwelch4 committed Mar 2, 2022
2 parents a56165f + 806151a commit 8ecdce6
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 4 deletions.
11 changes: 9 additions & 2 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ def build_config_managers_from_args(args):

format = args.format if args.format else "table"

use_random_rows = (
None if config_type == consts.SCHEMA_VALIDATION else args.use_random_row
)
random_row_batch_size = (
None if config_type == consts.SCHEMA_VALIDATION else args.random_row_batch_size
)

is_filesystem = source_client._source_type == "FileSystem"
tables_list = cli_tools.get_tables_list(
args.tables_list, default_value=[], is_filesystem=is_filesystem
Expand All @@ -209,8 +216,8 @@ def build_config_managers_from_args(args):
labels,
threshold,
format,
use_random_rows=args.use_random_row,
random_row_batch_size=args.random_row_batch_size,
use_random_rows=use_random_rows,
random_row_batch_size=random_row_batch_size,
source_client=source_client,
target_client=target_client,
result_handler_config=result_handler_config,
Expand Down
8 changes: 7 additions & 1 deletion data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,13 @@ def get_yaml_validation_block(self):
def get_result_handler(self):
"""Return ResultHandler instance from supplied config."""
if not self.result_handler_config:
return TextResultHandler(self._config.get(consts.CONFIG_FORMAT, "table"))
if self.config[consts.CONFIG_TYPE] == consts.SCHEMA_VALIDATION:
cols_filter_list = consts.SCHEMA_VALIDATION_COLUMN_FILTER_LIST
else:
cols_filter_list = consts.COLUMN_FILTER_LIST
return TextResultHandler(
self._config.get(consts.CONFIG_FORMAT, "table"), cols_filter_list
)

result_type = self.result_handler_config[consts.CONFIG_TYPE]
if result_type == "BigQuery":
Expand Down
8 changes: 8 additions & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,11 @@
"run_id",
"start_time",
]
SCHEMA_VALIDATION_COLUMN_FILTER_LIST = [
"run_id",
"start_time",
"end_time",
"aggregation_type",
"source_agg_value",
"target_agg_value",
]
7 changes: 7 additions & 0 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ Then `pip install pyodbc`.
```

## Hive
Please note that for Group By validations, the following property must be set in Hive:

`set hive:hive.groupby.orderby.position.alias=true`

If you are running Hive on Dataproc, you will also need to run
`pip install ibis-framework[impala]`

```
{
# Hive is based off Impala connector
Expand Down
2 changes: 1 addition & 1 deletion docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ After installing the CLI tool using the instructions below, you will be ready to

## Deploy Data Validation CLI on your machine

The Data Validation tooling requires Python 3.6+.
The Data Validation tooling requires Python 3.7+.

```
sudo apt-get install python3
Expand Down
20 changes: 20 additions & 0 deletions tests/system/data_sources/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@
consts.CONFIG_FORMAT: "table",
}

CONFIG_SCHEMA_VALIDATION = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: BQ_CONN,
consts.CONFIG_TARGET_CONN: BQ_CONN,
# Validation Type
consts.CONFIG_TYPE: "Schema",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "bigquery-public-data.new_york_citibike",
consts.CONFIG_TABLE_NAME: "citibike_trips",
consts.CONFIG_FORMAT: "table",
}

BQ_CONN_NAME = "bq-integration-test"
CLI_CONFIG_FILE = "example_test.yaml"

Expand Down Expand Up @@ -237,6 +249,14 @@ def test_numeric_types():
)


def test_schema_validation():
validator = data_validation.DataValidation(CONFIG_SCHEMA_VALIDATION, verbose=True)
df = validator.execute()

for validation in df.to_dict(orient="records"):
assert validation["status"] == "Pass"


def test_cli_store_yaml_then_run_gcs():
"""Test storing and retrieving validation YAML when GCS env var is set."""
# Store BQ Connection
Expand Down
35 changes: 35 additions & 0 deletions third_party/ibis/ibis_impala/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from ibis.backends.impala import connect
from ibis.backends.impala import udf
from ibis.backends.impala.client import ImpalaClient
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch

_impala_to_ibis_type = udf._impala_to_ibis_type

Expand Down Expand Up @@ -61,4 +63,37 @@ def parse_type(t):
raise Exception(t)


def get_schema(self, table_name, database=None):
"""
Return a Schema object for the indicated table and database
Parameters
----------
table_name : string
May be fully qualified
database : string, default None
Returns
-------
schema : ibis Schema
"""
qualified_name = self._fully_qualified_name(table_name, database)
query = "DESCRIBE {}".format(qualified_name)

# only pull out the first two columns which are names and types
# pairs = [row[:2] for row in self.con.fetchall(query)]
pairs = []
for row in self.con.fetchall(query):
if row[0] == "":
break
pairs.append(row[:2])

names, types = zip(*pairs)
ibis_types = [parse_type(type.lower()) for type in types]
names = [name.lower() for name in names]

return sch.Schema(names, ibis_types)


udf.parse_type = parse_type
ImpalaClient.get_schema = get_schema

0 comments on commit 8ecdce6

Please sign in to comment.