From be7824df8cd5bacd61862c7ff266b70b698461fd Mon Sep 17 00:00:00 2001 From: Neha Nene Date: Thu, 27 May 2021 18:44:08 -0500 Subject: [PATCH] feat: Added FileSystem connection type (#254) * Added FileSystem connection type, made schema optional for file validations * feat: added filesystem connection type and made schema optional for file validations * resolving merge conflict --- README.md | 12 +++++++++--- data_validation/__main__.py | 6 +++++- data_validation/cli_tools.py | 17 ++++++++++++++--- data_validation/clients.py | 2 +- data_validation/config_manager.py | 20 ++++++++++++-------- docs/connections.md | 27 ++++++++++++++++++++++----- docs/examples.md | 15 +++++++++++++-- tests/unit/test_clients.py | 2 +- tests/unit/test_config_manager.py | 10 ++++++++++ tests/unit/test_data_validation.py | 4 ++-- tests/unit/test_schema_validation.py | 4 ++-- 11 files changed, 91 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 5b32943b2..33bc933d2 100644 --- a/README.md +++ b/README.md @@ -108,9 +108,15 @@ used to run powerful validations without writing any queries. There are many occasions where you need to explore a data source while running validations. To avoid the need to open and install a new client, the CLI allows -you to run custom queries. `data-validation query --conn connection-name The -named connection to be queried. --query, -q The Raw query to run against the -supplied connection` +you to run custom queries. + +``` +data-validation query + --conn or -c CONN + The connection name to be queried + --query or -q QUERY + The raw query to run against the supplied connection +``` ## Query Configurations diff --git a/data_validation/__main__.py b/data_validation/__main__.py index 9fc440afa..1a57a825e 100644 --- a/data_validation/__main__.py +++ b/data_validation/__main__.py @@ -127,7 +127,11 @@ def build_config_managers_from_args(args): target_client = clients.get_data_client(target_conn) threshold = args.threshold if args.threshold else 0.0 - tables_list = cli_tools.get_tables_list(args.tables_list, default_value=[]) + + is_filesystem = True if source_conn["source_type"] == "FileSystem" else False + tables_list = cli_tools.get_tables_list( + args.tables_list, default_value=[], is_filesystem=is_filesystem + ) for table_obj in tables_list: config_manager = ConfigManager.build_config_manager( diff --git a/data_validation/cli_tools.py b/data_validation/cli_tools.py index 7e18409c3..faf2e1542 100644 --- a/data_validation/cli_tools.py +++ b/data_validation/cli_tools.py @@ -113,6 +113,11 @@ ["database_id", "ID of Spanner database (schema) to connect to"], ["google_service_account_key_path", "(Optional) GCP SA Key Path"], ], + "FileSystem": [ + ["table_name", "Table name to use as reference for file data"], + ["file_path", "The local, s3, or GCS file path to the data"], + ["file_type", "The file type of the file.'csv' or 'json'"], + ], } @@ -491,11 +496,12 @@ def get_arg_list(arg_value, default_value=None): return arg_list -def get_tables_list(arg_tables, default_value=None): +def get_tables_list(arg_tables, default_value=None, is_filesystem=False): """ Returns dictionary of tables. Backwards compatible for JSON input. arg_table (str): tables_list argument specified default_value (Any): A default value to supply when arg_value is empty. + is_filesystem (boolean): Boolean indicating whether source connection is a FileSystem. In this case, a schema is not required. """ if not arg_tables: return default_value @@ -506,17 +512,22 @@ def get_tables_list(arg_tables, default_value=None): except json.decoder.JSONDecodeError: tables_list = [] tables_mapping = list(csv.reader([arg_tables]))[0] + source_schema_required = False if is_filesystem else True for mapping in tables_mapping: tables_map = mapping.split("=") if len(tables_map) == 1: - schema, table = split_table(tables_map) + schema, table = split_table( + tables_map, schema_required=source_schema_required + ) table_dict = { "schema_name": schema, "table_name": table, } elif len(tables_map) == 2: - src_schema, src_table = split_table([tables_map[0]]) + src_schema, src_table = split_table( + [tables_map[0]], schema_required=source_schema_required + ) table_dict = { "schema_name": src_schema, diff --git a/data_validation/clients.py b/data_validation/clients.py index 0050bbebe..4b606453a 100644 --- a/data_validation/clients.py +++ b/data_validation/clients.py @@ -214,7 +214,7 @@ def get_data_client(connection_config): "Impala": impala_connect, "MySQL": MySQLClient, "Oracle": OracleClient, - "Pandas": get_pandas_client, + "FileSystem": get_pandas_client, "Postgres": PostgreSQLClient, "Redshift": PostgreSQLClient, "Teradata": TeradataClient, diff --git a/data_validation/config_manager.py b/data_validation/config_manager.py index 9ce59636b..cc37fee12 100644 --- a/data_validation/config_manager.py +++ b/data_validation/config_manager.py @@ -133,7 +133,7 @@ def filters(self): @property def source_schema(self): """Return string value of source schema.""" - return self._config[consts.CONFIG_SCHEMA_NAME] + return self._config.get(consts.CONFIG_SCHEMA_NAME, None) @property def source_table(self): @@ -143,9 +143,7 @@ def source_table(self): @property def target_schema(self): """Return string value of target schema.""" - return self._config.get( - consts.CONFIG_TARGET_SCHEMA_NAME, self._config[consts.CONFIG_SCHEMA_NAME] - ) + return self._config.get(consts.CONFIG_TARGET_SCHEMA_NAME, self.source_schema) @property def target_table(self): @@ -283,11 +281,7 @@ def build_config_manager( consts.CONFIG_TYPE: config_type, consts.CONFIG_SOURCE_CONN: source_conn, consts.CONFIG_TARGET_CONN: target_conn, - consts.CONFIG_SCHEMA_NAME: table_obj[consts.CONFIG_SCHEMA_NAME], consts.CONFIG_TABLE_NAME: table_obj[consts.CONFIG_TABLE_NAME], - consts.CONFIG_TARGET_SCHEMA_NAME: table_obj.get( - consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME] - ), consts.CONFIG_TARGET_TABLE_NAME: table_obj.get( consts.CONFIG_TARGET_TABLE_NAME, table_obj[consts.CONFIG_TABLE_NAME] ), @@ -297,6 +291,16 @@ def build_config_manager( consts.CONFIG_FILTERS: filter_config, } + # Only FileSystem connections do not require schemas + if source_conn["source_type"] != "FileSystem": + config[consts.CONFIG_SCHEMA_NAME] = table_obj[consts.CONFIG_SCHEMA_NAME] + config[consts.CONFIG_TARGET_SCHEMA_NAME] = table_obj.get( + consts.CONFIG_TARGET_SCHEMA_NAME, table_obj[consts.CONFIG_SCHEMA_NAME] + ) + else: + config[consts.CONFIG_SCHEMA_NAME] = None + config[consts.CONFIG_TARGET_SCHEMA_NAME] = None + return ConfigManager(config, source_client, target_client, verbose=verbose) def build_config_grouped_columns(self, grouped_columns): diff --git a/docs/connections.md b/docs/connections.md index cbc2e44aa..3a4a1d4b1 100644 --- a/docs/connections.md +++ b/docs/connections.md @@ -9,17 +9,17 @@ These commands can be used to create connections: ## Command template to create a connection: ``` -data-validation connections add --connection-name my-conn-name source_type +data-validation connections add --connection-name CONN_NAME source_type ``` ## Create a sample BigQuery connection: ``` -data-validation connections add --connection-name MY-BQ-CONNECTION BigQuery --project-id MY-GCP-PROJECT +data-validation connections add --connection-name MY_BQ_CONN BigQuery --project-id MY_GCP_PROJECT ``` ## Create a sample Teradata connection: ``` -data-validation connections add --connection-name MY-TD-CONNECTION Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD +data-validation connections add --connection-name MY_TD_CONN Teradata --host HOST_IP --port PORT --user_name USER_NAME --password PASSWORD ``` ## List existing connections @@ -42,13 +42,14 @@ The data validation tool supports the following connection types. * [Postgres](#postgres) * [MySQL](#mysql) * [Redshift](#redshift) +* [FileSystem](#filesystem) As you see above, Teradata and BigQuery have different sets of custom arguments (for example project_id for BQ versus host for Teradata). Every connection type requires its own configuration for connectivity. To find out the parameters for each connection type, use the following command. ``` -data-validation connections add -c '' -h +data-validation connections add -c CONN_NAME -h ``` Below is the expected configuration for each type. @@ -91,7 +92,7 @@ Below is the expected configuration for each type. # Configuration Required for All Data Soures "source_type": "Spanner", - # GCP Project to use for Spanne + # GCP Project to use for Spanner "project_id": "my-project-name", # ID of Spanner instance to connect to @@ -220,3 +221,19 @@ Then `pip install pyodbc`. } ``` +## FileSystem +``` +{ + # Configuration Required for All Data Soures + "source_type": "FileSystem", + + # Table name to use as a reference for file data + "table_name": "my_table_name", + + # The local, s3, or GCS file path to the data + "file_path": "gs://path/to/file", + + # The file type. Either 'csv' or 'json + "file_type":"csv" +} +``` diff --git a/docs/examples.md b/docs/examples.md index 932ad9c93..a2cfc16f2 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -2,9 +2,9 @@ This page describes some basic use cases of the tool. **PLEASE NOTE:** In below commands, my_bq_conn refers to the connection name for your BigQuery project. We are validating BigQuery tables that are -available in BigQuery public datasets. +available in BigQuery public datasets. These examples validate a table agaist itself for example purposes. -Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' default aggregation. +Also, note that if no aggregation flag is provided, the tool will run a 'COUNT *' as the default aggregation. #### Simple COUNT(*) on a table ````shell script @@ -110,6 +110,17 @@ data-validation run -t GroupedColumn -sc my_bq_conn -tc my_bq_conn -tbls bigquer data-validation run -t Column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --count tripduration,start_station_name -l tag=test-run,owner=name ```` +#### Run validation on a file +````shell script +# Additional dependencies needed for GCS files +pip install gcsfs +pip install fsspec + +data-validation connections add --connection-name file_conn FileSystem --table-name my_local_file --file-path gs://path/to/file --file-type csv + +data-validation run -t Column -sc file_conn -tc file_conn -tbls my_local_file --count name +```` + #### Run custom SQL ````shell script data-validation query diff --git a/tests/unit/test_clients.py b/tests/unit/test_clients.py index 24ab67d6f..d700d66c2 100644 --- a/tests/unit/test_clients.py +++ b/tests/unit/test_clients.py @@ -30,7 +30,7 @@ SOURCE_TABLE_FILE_PATH = "source_table_data.json" JSON_DATA = """[{"col_a":0,"col_b":"a"},{"col_a":1,"col_b":"b"}]""" SOURCE_CONN_CONFIG = { - "source_type": "Pandas", + "source_type": "FileSystem", "table_name": "my_table", "file_path": SOURCE_TABLE_FILE_PATH, "file_type": "json", diff --git a/tests/unit/test_config_manager.py b/tests/unit/test_config_manager.py index e9d811fe1..35ba739df 100644 --- a/tests/unit/test_config_manager.py +++ b/tests/unit/test_config_manager.py @@ -101,6 +101,16 @@ def test_config_property(module_under_test): assert config == config_manager._config +def test_schema_property(module_under_test): + """Test getting schema.""" + config_manager = module_under_test.ConfigManager( + SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False + ) + + target_schema = config_manager.target_schema + assert target_schema == "bigquery-public-data.new_york_citibike" + + def test_filters_property(module_under_test): config_manager = module_under_test.ConfigManager( SAMPLE_CONFIG, MockIbisClient(), MockIbisClient(), verbose=False diff --git a/tests/unit/test_data_validation.py b/tests/unit/test_data_validation.py index d09ba3598..d2a79308f 100644 --- a/tests/unit/test_data_validation.py +++ b/tests/unit/test_data_validation.py @@ -26,14 +26,14 @@ TARGET_TABLE_FILE_PATH = "target_table_data.json" SOURCE_CONN_CONFIG = { - "source_type": "Pandas", + "source_type": "FileSystem", "table_name": "my_table", "file_path": SOURCE_TABLE_FILE_PATH, "file_type": "json", } TARGET_CONN_CONFIG = { - "source_type": "Pandas", + "source_type": "FileSystem", "table_name": "my_table", "file_path": TARGET_TABLE_FILE_PATH, "file_type": "json", diff --git a/tests/unit/test_schema_validation.py b/tests/unit/test_schema_validation.py index 3bfa13ac1..da515798e 100644 --- a/tests/unit/test_schema_validation.py +++ b/tests/unit/test_schema_validation.py @@ -23,14 +23,14 @@ TARGET_TABLE_FILE_PATH = "target_table_data.json" SOURCE_CONN_CONFIG = { - "source_type": "Pandas", + "source_type": "FileSystem", "table_name": "my_table", "file_path": SOURCE_TABLE_FILE_PATH, "file_type": "json", } TARGET_CONN_CONFIG = { - "source_type": "Pandas", + "source_type": "FileSystem", "table_name": "my_table", "file_path": TARGET_TABLE_FILE_PATH, "file_type": "json",