Skip to content

Commit

Permalink
feat: teradata hashing implementation (#324)
Browse files Browse the repository at this point in the history
* registering optional teradata hash functions

* adding test

* scaffolding for teradata system tests

* repointing td endpoint

* changing td user

* linting

* swapping out account for udf

* I hate yaml

* I mean it

* adding teradata execution

* removing old test

* veeeeeeery old test is updated

* linting

* testing private pools for internal connectivity

* adding filter test

* lint

* constants are a blessing and a curse

* trying to duplicate TD filter issue

* trying equality filter

* thinking is hard

* linting
  • Loading branch information
renzokuken committed Nov 4, 2021
1 parent 1dba63b commit b74e03e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 59 deletions.
11 changes: 11 additions & 0 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# limitations under the License.

timeout: 7200s
options:
pool:
name: 'projects/pso-kokoro-resources/locations/us-central1/workerPools/private-pool'
steps:
- id: lint
name: 'gcr.io/pso-kokoro-resources/python-multi'
Expand Down Expand Up @@ -70,6 +73,14 @@ steps:
- 'NOX_SESSION=integration_spanner'
- 'PROJECT_ID=pso-kokoro-resources'
waitFor: ['-']
- id: integration_teradata
name: 'gcr.io/pso-kokoro-resources/python-multi'
args: ['bash', './ci/build.sh']
env:
- 'NOX_SESSION=integration_teradata'
- 'PROJECT_ID=pso-kokoro-resources'
- 'TERADATA_PASSWORD=udfs'
- 'TERADATA_HOST=10.128.15.235'
- id: integration_state
name: 'gcr.io/pso-kokoro-resources/python-multi'
args: ['bash', './ci/build.sh']
Expand Down
17 changes: 17 additions & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,23 @@ def integration_spanner(session):
session.run("pytest", "tests/system/data_sources/test_spanner.py", *session.posargs)


@nox.session(python=PYTHON_VERSIONS, venv_backend="venv")
def integration_teradata(session):
"""Run Teradata integration tests.
Ensure Teradata validation is running as expected.
"""
_setup_session_requirements(session, extra_packages=["teradatasql"])

expected_env_vars = ["PROJECT_ID", "TERADATA_PASSWORD", "TERADATA_HOST"]
for env_var in expected_env_vars:
if not os.environ.get(env_var, ""):
raise Exception("Expected Env Var: %s" % env_var)

session.run(
"pytest", "tests/system/data_sources/test_teradata.py", *session.posargs
)


@nox.session(python=random.choice(PYTHON_VERSIONS), venv_backend="venv")
def integration_state(session):
"""Run StateManager integration tests.
Expand Down
99 changes: 41 additions & 58 deletions tests/system/data_sources/test_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,72 +12,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import os

from data_validation import data_validation
from data_validation.query_builder import query_builder
from data_validation import data_validation, consts

TERADATA_PASSWORD = os.getenv("TERADATA_PASSWORD")
TERADATA_HOST = os.getenv("TERADATA_HOST")
PROJECT_ID = os.getenv("PROJECT_ID")

TERADATA_CONFIG = {
# Configuration Required for All Data Soures
conn = {
"source_type": "Teradata",
# BigQuery Specific Connection Config
"config": {
"host": "127.0.0.1",
"user_name": "dbc",
"password": "dbc",
"port": 10255,
},
# Configuration Required Depending on Validator Type
"schema_name": "Sys_Calendar",
"table_name": "CALENDAR",
"partition_column": "year_of_calendar",
"format": "table",
"host": TERADATA_HOST,
"user_name": "udf",
"password": TERADATA_PASSWORD,
"port": 1025,
}


def create_validator(builder):
return data_validation.DataValidation(
TERADATA_CONFIG, builder=builder, result_handler=None, verbose=False
)
TERADATA_CONFIG = {
# Specific Connection Config
consts.CONFIG_SOURCE_CONN: conn,
consts.CONFIG_TARGET_CONN: conn,
# Validation Type
consts.CONFIG_TYPE: "Column",
# Configuration Required Depending on Validator Type
consts.CONFIG_SCHEMA_NAME: "Sys_Calendar",
consts.CONFIG_TABLE_NAME: "CALENDAR",
consts.CONFIG_AGGREGATES: [
{
consts.CONFIG_TYPE: "count",
consts.CONFIG_SOURCE_COLUMN: "year_of_calendar",
consts.CONFIG_TARGET_COLUMN: "year_of_calendar",
consts.CONFIG_FIELD_ALIAS: "count",
},
],
consts.CONFIG_FORMAT: "table",
consts.CONFIG_FILTERS: [
{
consts.CONFIG_TYPE: consts.FILTER_TYPE_EQUALS,
consts.CONFIG_FILTER_SOURCE_COLUMN: "year_of_calendar",
consts.CONFIG_FILTER_SOURCE_VALUE: 2010,
consts.CONFIG_FILTER_TARGET_COLUMN: "year_of_calendar",
consts.CONFIG_FILTER_TARGET_VALUE: 2010,
},
],
}


def test_count_validator():
builder = query_builder.QueryBuilder.build_count_validator()
validator = create_validator(builder)
df = validator.execute()
assert df["count_inp"][0] > 0
assert df["count_inp"][0] == df["count_out"][0]


def test_partitioned_count_validator():
builder = query_builder.QueryBuilder.build_partition_count_validator(
days_past=700, limit=100
)
# Add Filters for large table
_add_calendar_date_filters(builder)

validator = create_validator(builder)
validator = data_validation.DataValidation(TERADATA_CONFIG, verbose=True)
df = validator.execute()
rows = list(df.iterrows())

# Check that all partitions are unique.
partitions = frozenset(df["partition_key"])
assert len(rows) == len(partitions)

for _, row in rows:
assert row["count_inp"] > 0
assert row["count_inp"] == row["count_out"]


def _add_calendar_date_filters(builder):
# Adding custom filter for better results
project_start_date = datetime(2020, 1, 1, 0, 0, 0)
filter_obj = query_builder.FilterField.less_than(
"calendar_date", project_start_date
)
builder.add_filter_field(filter_obj)

in_the_past = datetime(1991, 5, 2, 0, 0, 0)
filter_obj = query_builder.FilterField.greater_than("calendar_date", in_the_past)
builder.add_filter_field(filter_obj)
assert int(df["source_agg_value"][0]) > 0
assert df["source_agg_value"][0] == df["target_agg_value"][0]
28 changes: 28 additions & 0 deletions tests/system/ibis_addon/test_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

# Import required in order to register operations.
import third_party.ibis.ibis_addon.operations # noqa: F401
from third_party.ibis import ibis_teradata


@pytest.fixture
def bigquery_client():
return ibis_bigquery.connect()


@pytest.fixture
def teradata_client():
return ibis_teradata.connect()


def test_bit_xor_bigquery(bigquery_client):
tbl = bigquery_client.table(
"citibike_trips", database="bigquery-public-data.new_york_citibike"
Expand Down Expand Up @@ -126,3 +132,25 @@ def test_hashbytes_bigquery_binary(bigquery_client):
"""
).strip()
)


def test_hashbytes_teradata_binary(teradata_client):
tbl = teradata_client.table("citibike_trips", database="udfs.new_york_citibike")
expr = tbl[
tbl["start_station_name"]
.cast(dt.binary)
.hashbytes(how="sha256")
.name("station_hash")
]
sql = expr.compile()
# TODO: Update the expected SQL to be a valid query once
# https://github.com/ibis-project/ibis/issues/2354 is fixed.
assert (
sql
== textwrap.dedent(
"""
SELECT hash_sha256(CAST(`start_station_name` AS BINARY)) AS `station_hash`
FROM `udfs.citibike_trips`
"""
).strip()
)
2 changes: 1 addition & 1 deletion third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ def sa_format_raw_sql(translator, expr):
ImpalaExprTranslator._registry[RawSQL] = format_raw_sql
OracleExprTranslator._registry[RawSQL] = sa_format_raw_sql
TeradataExprTranslator._registry[RawSQL] = format_raw_sql
TeradataExprTranslator._registry[RawSQL] = format_hashbytes_bigquery
TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata

0 comments on commit b74e03e

Please sign in to comment.