Skip to content

Commit

Permalink
feat: Added FileSystem connection type (#254)
Browse files Browse the repository at this point in the history
* Added FileSystem connection type, made schema optional for file validations

* feat: added filesystem connection type and  made schema optional for file validations

* resolving merge conflict
  • Loading branch information
nehanene15 committed May 27, 2021
1 parent c968024 commit be7824d
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 28 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,15 @@ used to run powerful validations without writing any queries.

There are many occasions where you need to explore a data source while running
validations. To avoid the need to open and install a new client, the CLI allows
you to run custom queries. `data-validation query --conn connection-name The
named connection to be queried. --query, -q The Raw query to run against the
supplied connection`
you to run custom queries.

```
data-validation query
--conn or -c CONN
The connection name to be queried
--query or -q QUERY
The raw query to run against the supplied connection
```

## Query Configurations

Expand Down
6 changes: 5 additions & 1 deletion data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ def build_config_managers_from_args(args):
target_client = clients.get_data_client(target_conn)

threshold = args.threshold if args.threshold else 0.0
tables_list = cli_tools.get_tables_list(args.tables_list, default_value=[])

is_filesystem = True if source_conn["source_type"] == "FileSystem" else False
tables_list = cli_tools.get_tables_list(
args.tables_list, default_value=[], is_filesystem=is_filesystem
)

for table_obj in tables_list:
config_manager = ConfigManager.build_config_manager(
Expand Down
17 changes: 14 additions & 3 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
["database_id", "ID of Spanner database (schema) to connect to"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
],
"FileSystem": [
["table_name", "Table name to use as reference for file data"],
["file_path", "The local, s3, or GCS file path to the data"],
["file_type", "The file type of the file.'csv' or 'json'"],
],
}


Expand Down Expand Up @@ -491,11 +496,12 @@ def get_arg_list(arg_value, default_value=None):
return arg_list


def get_tables_list(arg_tables, default_value=None):
def get_tables_list(arg_tables, default_value=None, is_filesystem=False):
""" Returns dictionary of tables. Backwards compatible for JSON input.
arg_table (str): tables_list argument specified
default_value (Any): A default value to supply when arg_value is empty.
is_filesystem (boolean): Boolean indicating whether source connection is a FileSystem. In this case, a schema is not required.
"""
if not arg_tables:
return default_value
Expand All @@ -506,17 +512,22 @@ def get_tables_list(arg_tables, default_value=None):
except json.decoder.JSONDecodeError:
tables_list = []
tables_mapping = list(csv.reader([arg_tables]))[0]
source_schema_required = False if is_filesystem else True

for mapping in tables_mapping:
tables_map = mapping.split("=")
if len(tables_map) == 1:
schema, table = split_table(tables_map)
schema, table = split_table(
tables_map, schema_required=source_schema_required
)
table_dict = {
"schema_name": schema,
"table_name": table,
}
elif len(tables_map) == 2:
src_schema, src_table = split_table([tables_map[0]])
src_schema, src_table = split_table(
[tables_map[0]], schema_required=source_schema_required
)

table_dict = {
"schema_name": src_schema,
Expand Down
2 changes: 1 addition & 1 deletion data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def get_data_client(connection_config):
"Impala": impala_connect,
"MySQL": MySQLClient,
"Oracle": OracleClient,
"Pandas": get_pandas_client,
"FileSystem": get_pandas_client,
"Postgres": PostgreSQLClient,
"Redshift": PostgreSQLClient,
"Teradata": TeradataClient,
Expand Down
20 changes: 12 additions & 8 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def filters(self):
@property
def source_schema(self):
"""Return string value of source schema."""
return self._config[consts.CONFIG_SCHEMA_NAME]
return self._config.get(consts.CONFIG_SCHEMA_NAME, None)

@property
def source_table(self):
Expand All @@ -143,9 +143,7 @@ def source_table(self):
@property
def target_schema(self):
"""Return string value of target schema."""
return self._config.get(
consts.CONFIG_TARGET_SCHEMA_NAME, self._config[consts.CONFIG_SCHEMA_NAME]
)
return self._config.get(consts.CONFIG_TARGET_SCHEMA_NAME, self.source_schema)

@property
def target_table(self):
Expand Down Expand Up @@ -283,11 +281,7 @@ def build_config_manager(
consts.CONFIG_TYPE: config_type,
consts.CONFIG_SOURCE_CONN: source_conn,
consts.CONFIG_TARGET_CONN: target_conn,
consts.CONFIG_SCHEMA_NAME: table_obj[consts.CONFIG_SCHEMA_NAME],
consts.CONFIG_TABLE_NAME: table_obj[consts.CONFIG_TABLE_NAME],
consts.CONFIG_TARGET_SCHEMA_NAME: table_obj.get(
consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME]
),
consts.CONFIG_TARGET_TABLE_NAME: table_obj.get(
consts.CONFIG_TARGET_TABLE_NAME, table_obj[consts.CONFIG_TABLE_NAME]
),
Expand All @@ -297,6 +291,16 @@ def build_config_manager(
consts.CONFIG_FILTERS: filter_config,
}

# Only FileSystem connections do not require schemas
if source_conn["source_type"] != "FileSystem":
config[consts.CONFIG_SCHEMA_NAME] = table_obj[consts.CONFIG_SCHEMA_NAME]
config[consts.CONFIG_TARGET_SCHEMA_NAME] = table_obj.get(
consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME]
)
else:
config[consts.CONFIG_SCHEMA_NAME] = None
config[consts.CONFIG_TARGET_SCHEMA_NAME] = None

return ConfigManager(config, source_client, target_client, verbose=verbose)

def build_config_grouped_columns(self, grouped_columns):
Expand Down
27 changes: 22 additions & 5 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ These commands can be used to create connections:

## Command template to create a connection:
```
data-validation connections add --connection-name my-conn-name source_type
data-validation connections add --connection-name CONN_NAME source_type
```

## Create a sample BigQuery connection:
```
data-validation connections add --connection-name MY-BQ-CONNECTION BigQuery --project-id MY-GCP-PROJECT
data-validation connections add --connection-name MY_BQ_CONN BigQuery --project-id MY_GCP_PROJECT
```

## Create a sample Teradata connection:
```
data-validation connections add --connection-name MY-TD-CONNECTION Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD
data-validation connections add --connection-name MY_TD_CONN Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD
```

## List existing connections
Expand All @@ -42,13 +42,14 @@ The data validation tool supports the following connection types.
* [Postgres](#postgres)
* [MySQL](#mysql)
* [Redshift](#redshift)
* [FileSystem](#filesystem)

As you see above, Teradata and BigQuery have different sets of custom arguments (for example project_id for BQ versus host for Teradata).

Every connection type requires its own configuration for connectivity. To find out the parameters for each connection type, use the following command.

```
data-validation connections add -c '<name>' <connection type> -h
data-validation connections add -c CONN_NAME <connection type> -h
```

Below is the expected configuration for each type.
Expand Down Expand Up @@ -91,7 +92,7 @@ Below is the expected configuration for each type.
# Configuration Required for All Data Soures
"source_type": "Spanner",
# GCP Project to use for Spanne
# GCP Project to use for Spanner
"project_id": "my-project-name",
# ID of Spanner instance to connect to
Expand Down Expand Up @@ -220,3 +221,19 @@ Then `pip install pyodbc`.
}
```

## FileSystem
```
{
# Configuration Required for All Data Soures
"source_type": "FileSystem",
# Table name to use as a reference for file data
"table_name": "my_table_name",
# The local, s3, or GCS file path to the data
"file_path": "gs://path/to/file",
# The file type. Either 'csv' or 'json
"file_type":"csv"
}
```
15 changes: 13 additions & 2 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
This page describes some basic use cases of the tool.

**PLEASE NOTE:** In below commands, my_bq_conn refers to the connection name for your BigQuery project. We are validating BigQuery tables that are
available in BigQuery public datasets.
available in BigQuery public datasets. These examples validate a table agaist itself for example purposes.

Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' default aggregation.
Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' as the default aggregation.

#### Simple COUNT(*) on a table
````shell script
Expand Down Expand Up @@ -110,6 +110,17 @@ data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquer
data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -l tag=test-run,owner=name
````

#### Run validation on a file
````shell script
# Additional dependencies needed for GCS files
pip install gcsfs
pip install fsspec

data-validation connections add --connection-name file_conn FileSystem --table-name my_local_file --file-path gs://path/to/file --file-type csv

data-validation run -t Column -sc file_conn -tc file_conn -tbls my_local_file --count name
````

#### Run custom SQL
````shell script
data-validation query
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SOURCE_TABLE_FILE_PATH = "source_table_data.json"
JSON_DATA = """[{"col_a":0,"col_b":"a"},{"col_a":1,"col_b":"b"}]"""
SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def test_config_property(module_under_test):
assert config == config_manager._config


def test_schema_property(module_under_test):
"""Test getting schema."""
config_manager = module_under_test.ConfigManager(
SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False
)

target_schema = config_manager.target_schema
assert target_schema == "bigquery-public-data.new_york_citibike"


def test_filters_property(module_under_test):
config_manager = module_under_test.ConfigManager(
SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
TARGET_TABLE_FILE_PATH = "target_table_data.json"

SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
}

TARGET_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": TARGET_TABLE_FILE_PATH,
"file_type": "json",
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
TARGET_TABLE_FILE_PATH = "target_table_data.json"

SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
}

TARGET_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": TARGET_TABLE_FILE_PATH,
"file_type": "json",
Expand Down

0 comments on commit be7824d

Please sign in to comment.