Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added FileSystem connection type #254

Merged
merged 8 commits into from
May 27, 2021
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,15 @@ used to run powerful validations without writing any queries.

There are many occasions where you need to explore a data source while running
validations. To avoid the need to open and install a new client, the CLI allows
you to run custom queries. `data-validation query --conn connection-name The
named connection to be queried. --query, -q The Raw query to run against the
supplied connection`
you to run custom queries.

```
data-validation query
--conn or -c CONN
The connection name to be queried
--query or -q QUERY
The raw query to run against the supplied connection
```

## Query Configurations

Expand Down
6 changes: 5 additions & 1 deletion data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ def build_config_managers_from_args(args):
target_client = clients.get_data_client(target_conn)

threshold = args.threshold if args.threshold else 0.0
tables_list = cli_tools.get_tables_list(args.tables_list, default_value=[])

is_filesystem = True if source_conn["source_type"] == "FileSystem" else False
tables_list = cli_tools.get_tables_list(
args.tables_list, default_value=[], is_filesystem=is_filesystem
)

for table_obj in tables_list:
config_manager = ConfigManager.build_config_manager(
Expand Down
17 changes: 14 additions & 3 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
["database_id", "ID of Spanner database (schema) to connect to"],
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
],
"FileSystem": [
["table_name", "Table name to use as reference for file data"],
["file_path", "The local, s3, or GCS file path to the data"],
["file_type", "The file type of the file.'csv' or 'json'"],
],
}


Expand Down Expand Up @@ -491,11 +496,12 @@ def get_arg_list(arg_value, default_value=None):
return arg_list


def get_tables_list(arg_tables, default_value=None):
def get_tables_list(arg_tables, default_value=None, is_filesystem=False):
""" Returns dictionary of tables. Backwards compatible for JSON input.

arg_table (str): tables_list argument specified
default_value (Any): A default value to supply when arg_value is empty.
is_filesystem (boolean): Boolean indicating whether source connection is a FileSystem. In this case, a schema is not required.
"""
if not arg_tables:
return default_value
Expand All @@ -506,17 +512,22 @@ def get_tables_list(arg_tables, default_value=None):
except json.decoder.JSONDecodeError:
tables_list = []
tables_mapping = list(csv.reader([arg_tables]))[0]
source_schema_required = False if is_filesystem else True

for mapping in tables_mapping:
tables_map = mapping.split("=")
if len(tables_map) == 1:
schema, table = split_table(tables_map)
schema, table = split_table(
tables_map, schema_required=source_schema_required
)
table_dict = {
"schema_name": schema,
"table_name": table,
}
elif len(tables_map) == 2:
src_schema, src_table = split_table([tables_map[0]])
src_schema, src_table = split_table(
[tables_map[0]], schema_required=source_schema_required
)

table_dict = {
"schema_name": src_schema,
Expand Down
2 changes: 1 addition & 1 deletion data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def get_data_client(connection_config):
"Impala": impala_connect,
"MySQL": MySQLClient,
"Oracle": OracleClient,
"Pandas": get_pandas_client,
"FileSystem": get_pandas_client,
"Postgres": PostgreSQLClient,
"Redshift": PostgreSQLClient,
"Teradata": TeradataClient,
Expand Down
20 changes: 12 additions & 8 deletions data_validation/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def filters(self):
@property
def source_schema(self):
"""Return string value of source schema."""
return self._config[consts.CONFIG_SCHEMA_NAME]
return self._config.get(consts.CONFIG_SCHEMA_NAME, None)

@property
def source_table(self):
Expand All @@ -143,9 +143,7 @@ def source_table(self):
@property
def target_schema(self):
"""Return string value of target schema."""
return self._config.get(
consts.CONFIG_TARGET_SCHEMA_NAME, self._config[consts.CONFIG_SCHEMA_NAME]
)
return self._config.get(consts.CONFIG_TARGET_SCHEMA_NAME, self.source_schema)

@property
def target_table(self):
Expand Down Expand Up @@ -283,11 +281,7 @@ def build_config_manager(
consts.CONFIG_TYPE: config_type,
consts.CONFIG_SOURCE_CONN: source_conn,
consts.CONFIG_TARGET_CONN: target_conn,
consts.CONFIG_SCHEMA_NAME: table_obj[consts.CONFIG_SCHEMA_NAME],
consts.CONFIG_TABLE_NAME: table_obj[consts.CONFIG_TABLE_NAME],
consts.CONFIG_TARGET_SCHEMA_NAME: table_obj.get(
consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME]
),
consts.CONFIG_TARGET_TABLE_NAME: table_obj.get(
consts.CONFIG_TARGET_TABLE_NAME, table_obj[consts.CONFIG_TABLE_NAME]
),
Expand All @@ -297,6 +291,16 @@ def build_config_manager(
consts.CONFIG_FILTERS: filter_config,
}

# Only FileSystem connections do not require schemas
if source_conn["source_type"] != "FileSystem":
config[consts.CONFIG_SCHEMA_NAME] = table_obj[consts.CONFIG_SCHEMA_NAME]
config[consts.CONFIG_TARGET_SCHEMA_NAME] = table_obj.get(
consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME]
)
else:
config[consts.CONFIG_SCHEMA_NAME] = None
config[consts.CONFIG_TARGET_SCHEMA_NAME] = None

return ConfigManager(config, source_client, target_client, verbose=verbose)

def build_config_grouped_columns(self, grouped_columns):
Expand Down
27 changes: 22 additions & 5 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ These commands can be used to create connections:

## Command template to create a connection:
```
data-validation connections add --connection-name my-conn-name source_type
data-validation connections add --connection-name CONN_NAME source_type
```

## Create a sample BigQuery connection:
```
data-validation connections add --connection-name MY-BQ-CONNECTION BigQuery --project-id MY-GCP-PROJECT
data-validation connections add --connection-name MY_BQ_CONN BigQuery --project-id MY_GCP_PROJECT
```

## Create a sample Teradata connection:
```
data-validation connections add --connection-name MY-TD-CONNECTION Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD
data-validation connections add --connection-name MY_TD_CONN Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD
```

## List existing connections
Expand All @@ -42,13 +42,14 @@ The data validation tool supports the following connection types.
* [Postgres](#postgres)
* [MySQL](#mysql)
* [Redshift](#redshift)
* [FileSystem](#filesystem)

As you see above, Teradata and BigQuery have different sets of custom arguments (for example project_id for BQ versus host for Teradata).

Every connection type requires its own configuration for connectivity. To find out the parameters for each connection type, use the following command.

```
data-validation connections add -c '<name>' <connection type> -h
data-validation connections add -c CONN_NAME <connection type> -h
```

Below is the expected configuration for each type.
Expand Down Expand Up @@ -91,7 +92,7 @@ Below is the expected configuration for each type.
# Configuration Required for All Data Soures
"source_type": "Spanner",

# GCP Project to use for Spanne
# GCP Project to use for Spanner
"project_id": "my-project-name",

# ID of Spanner instance to connect to
Expand Down Expand Up @@ -220,3 +221,19 @@ Then `pip install pyodbc`.
}
```

## FileSystem
```
{
# Configuration Required for All Data Soures
"source_type": "FileSystem",

# Table name to use as a reference for file data
"table_name": "my_table_name",

# The local, s3, or GCS file path to the data
"file_path": "gs://path/to/file",

# The file type. Either 'csv' or 'json
"file_type":"csv"
}
```
15 changes: 13 additions & 2 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
This page describes some basic use cases of the tool.

**PLEASE NOTE:** In below commands, my_bq_conn refers to the connection name for your BigQuery project. We are validating BigQuery tables that are
available in BigQuery public datasets.
available in BigQuery public datasets. These examples validate a table agaist itself for example purposes.

Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' default aggregation.
Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' as the default aggregation.

#### Simple COUNT(*) on a table
````shell script
Expand Down Expand Up @@ -110,6 +110,17 @@ data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquer
data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -l tag=test-run,owner=name
````

#### Run validation on a file
````shell script
# Additional dependencies needed for GCS files
pip install gcsfs
pip install fsspec

data-validation connections add --connection-name file_conn FileSystem --table-name my_local_file --file-path gs://path/to/file --file-type csv

data-validation run -t Column -sc file_conn -tc file_conn -tbls my_local_file --count name
````

#### Run custom SQL
````shell script
data-validation query
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SOURCE_TABLE_FILE_PATH = "source_table_data.json"
JSON_DATA = """[{"col_a":0,"col_b":"a"},{"col_a":1,"col_b":"b"}]"""
SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
Expand Down
10 changes: 10 additions & 0 deletions tests/unit/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,16 @@ def test_config_property(module_under_test):
assert config == config_manager._config


def test_schema_property(module_under_test):
"""Test getting schema."""
config_manager = module_under_test.ConfigManager(
SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False
)

target_schema = config_manager.target_schema
assert target_schema == "bigquery-public-data.new_york_citibike"


def test_filters_property(module_under_test):
config_manager = module_under_test.ConfigManager(
SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
TARGET_TABLE_FILE_PATH = "target_table_data.json"

SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
}

TARGET_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": TARGET_TABLE_FILE_PATH,
"file_type": "json",
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
TARGET_TABLE_FILE_PATH = "target_table_data.json"

SOURCE_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": SOURCE_TABLE_FILE_PATH,
"file_type": "json",
}

TARGET_CONN_CONFIG = {
"source_type": "Pandas",
"source_type": "FileSystem",
"table_name": "my_table",
"file_path": TARGET_TABLE_FILE_PATH,
"file_type": "json",
Expand Down