Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get_schema with Postgres #411

Merged
merged 2 commits into from
Apr 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ def get_ibis_table(client, schema_name, table_name, database_name=None):
return client.table(table_name, database=schema_name)


def get_ibis_table_schema(client, schema_name, table_name):
"""Return Ibis Table Schema for Supplied Client.

client (IbisClient): Client to use for table
schema_name (str): Schema name of table object
table_name (str): Table name of table object
database_name (str): Database name (generally default is used)
"""
if type(client) in [MySQLClient, PostgreSQLClient]:
return client.schema(schema_name).table(table_name).schema()
else:
return client.get_schema(table_name, schema_name)


def list_schemas(client):
"""Return a list of schemas in the DB."""
if type(client) in [
Expand Down
14 changes: 9 additions & 5 deletions data_validation/schema_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import datetime
import pandas

from data_validation import metadata, consts
from data_validation import metadata, consts, clients


class SchemaValidation(object):
Expand All @@ -33,11 +33,15 @@ def __init__(self, config_manager, run_metadata=None, verbose=False):

def execute(self):
"""Performs a validation between source and a target schema"""
ibis_source_schema = self.config_manager.source_client.get_schema(
self.config_manager.source_table, self.config_manager.source_schema
ibis_source_schema = clients.get_ibis_table_schema(
self.config_manager.source_client,
self.config_manager.source_schema,
self.config_manager.source_table,
)
ibis_target_schema = self.config_manager.target_client.get_schema(
self.config_manager.target_table, self.config_manager.target_schema
ibis_target_schema = clients.get_ibis_table_schema(
self.config_manager.target_client,
self.config_manager.target_schema,
self.config_manager.target_table,
)

source_fields = {}
Expand Down
24 changes: 24 additions & 0 deletions tests/system/data_sources/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@
consts.CONFIG_FORMAT: "table",
}

CONFIG_SCHEMA_VALID = {
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
consts.CONFIG_TYPE: "Column",
consts.CONFIG_SCHEMA_NAME: "guestbook",
consts.CONFIG_TABLE_NAME: "entries",
consts.CONFIG_FORMAT: "table",
}


def test_mysql_count_invalid_host():
try:
Expand All @@ -61,3 +70,18 @@ def test_mysql_count_invalid_host():
except exceptions.DataClientConnectionFailure:
# Local Testing will not work for MySQL
pass


def test_schema_validation():
try:
data_validator = data_validation.DataValidation(
CONFIG_SCHEMA_VALID,
verbose=False,
)
df = data_validator.execute()

for validation in df.to_dict(orient="records"):
assert validation["status"] == consts.VALIDATION_STATUS_SUCCESS
except exceptions.DataClientConnectionFailure:
# Local Testing will not work for MySQL
pass
44 changes: 33 additions & 11 deletions tests/system/data_sources/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@
# Cloud SQL proxy listens to localhost
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_DATABASE = os.getenv("POSTGRES_DATABASE", "guestbook")
PROJECT_ID = os.getenv("PROJECT_ID")

CONN = {
"source_type": "Postgres",
"host": POSTGRES_HOST,
"user": "postgres",
"password": POSTGRES_PASSWORD,
"port": 5432,
"database": POSTGRES_DATABASE,
}


@pytest.fixture
def cloud_sql(request):
Expand All @@ -54,19 +64,10 @@ def cloud_sql(request):

def test_postgres_count(cloud_sql):
"""Test count validation on Postgres instance"""
conn = {
"source_type": "Postgres",
"host": POSTGRES_HOST,
"user": "postgres",
"password": POSTGRES_PASSWORD,
"port": 5432,
"database": "guestbook",
}

config_count_valid = {
# BigQuery Specific Connection Config
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
Expand Down Expand Up @@ -103,3 +104,24 @@ def test_postgres_count(cloud_sql):

assert df["source_agg_value"].equals(df["target_agg_value"])
assert sorted(list(df["source_agg_value"])) == ["28", "7", "7"]


def test_schema_validation(cloud_sql):
"""Test schema validation on Postgres instance"""
config_count_valid = {
consts.CONFIG_SOURCE_CONN: CONN,
consts.CONFIG_TARGET_CONN: CONN,
consts.CONFIG_TYPE: "Schema",
consts.CONFIG_SCHEMA_NAME: "public",
consts.CONFIG_TABLE_NAME: "entries",
consts.CONFIG_FORMAT: "table",
}

data_validator = data_validation.DataValidation(
config_count_valid,
verbose=False,
)
df = data_validator.execute()

for validation in df.to_dict(orient="records"):
assert validation["status"] == consts.VALIDATION_STATUS_SUCCESS