From e3dd7ed1d524c613f9c78c36bb1ad5346c37ec62 Mon Sep 17 00:00:00 2001 From: Helen Cristina Date: Tue, 24 May 2022 20:15:39 +0000 Subject: [PATCH] feat: add a new DAG example to run DVT (#485) * feat: add a new DAG example to run DVT * feat: Change DAG for simple count. Remove old DAG * docs: Add section for Airflow on Installation page * docs: Update Airflow section on Installation page --- README.md | 31 +--------- docs/installation.md | 6 ++ samples/airflow/dvt_airflow_dag.py | 95 ++++++++++++++++++++++++++++++ samples/airflow_dag.py | 89 ---------------------------- samples/tests/test_airflow_dag.py | 4 +- 5 files changed, 104 insertions(+), 121 deletions(-) create mode 100644 samples/airflow/dvt_airflow_dag.py delete mode 100644 samples/airflow_dag.py diff --git a/README.md b/README.md index cc3a63602..7cff0afba 100644 --- a/README.md +++ b/README.md @@ -642,36 +642,7 @@ in the Data Validation tool, it is a simple process. 3. You are done, you can reference the data source via the config. - Config: {"source_type": "", ...KV Values required in Client...} - -## Deploy to Composer - -``` -#!/bin/bash - -export COMPOSER_ENV="" -export LOCATION="" - -echo "Creating Composer Env: $COMPOSER_ENV" -gcloud services enable composer.googleapis.com -gcloud composer environments create $COMPOSER_ENV --location=$LOCATION --python-version=3 - -echo "Updating Composer Env Reqs: $COMPOSER_ENV" -# Composer builds Pandas and BigQuery for you, these should be stripped out -cat requirements.txt | grep -v pandas | grep -v google-cloud-bigquery > temp_reqs.txt -gcloud composer environments update $COMPOSER_ENV --location=$LOCATION --update-pypi-packages-from-file=temp_reqs.txt -rm temp_reqs.txt - -# Deploy Package to Composer (the hacky way) -echo "Rebuilding Data Validation Package in GCS" -export GCS_BUCKET_PATH=$(gcloud composer environments describe $COMPOSER_ENV --location=$LOCATION | grep dagGcsPrefix | awk '{print $2;}') -gsutil rm -r $GCS_BUCKET_PATH/data_validation -gsutil cp -r data_validation $GCS_BUCKET_PATH/data_validation - -# Deploy Test DAG to Composer -echo "Pushing Data Validation Test Operator to GCS" -gsutil cp tests/test_data_validation_operators.py $GCS_BUCKET_PATH/ -``` - + ## Contributing Contributions are welcome. See the [contributing guide](CONTRIBUTING.md) for diff --git a/docs/installation.md b/docs/installation.md index 1e0f282b1..0d407a397 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -124,3 +124,9 @@ The unit test suite can be executed using either `pytest tests/unit` or `python If native installation is not an option for you, you can create a Docker image for this tool. Here's an [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/docker/README.md) on how you can create a sample docker image for this tool. + + +## Automate using Apache Airflow +You can orchestrate DVT by running a validation as a task within an Airflow DAG. + +Here's a simple [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/airflow/dvt_airflow_dag.py) on how you can execute this tool using the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator). \ No newline at end of file diff --git a/samples/airflow/dvt_airflow_dag.py b/samples/airflow/dvt_airflow_dag.py new file mode 100644 index 000000000..e4e79f2e9 --- /dev/null +++ b/samples/airflow/dvt_airflow_dag.py @@ -0,0 +1,95 @@ +""" + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +""" + +""" +Example DAG for Data Validation Tool + +Requirements to run the DAG: +- Airflow environment created with Public IP (Private environment disabled) +- Create an Airflow variable called 'gcp_project' with the GCP Project ID +- Create BigQuery dataset and table to store the validation results + bq mk pso_data_validator + bq mk --table \ + --time_partitioning_field start_time \ + --clustering_fields validation_name,run_id \ + pso_data_validator.results \ + results_schema.json +""" + +from datetime import timedelta + +import airflow +from airflow import DAG +from airflow import models +from airflow.operators.python import PythonVirtualenvOperator + +default_args = { + "start_date": airflow.utils.dates.days_ago(1), + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +with DAG( + "dvt_example_dag", + default_args=default_args, + description="Data Validation Tool Example DAG", + schedule_interval=None, + dagrun_timeout=timedelta(minutes=60), +) as dag: + + def validation_function(project): + from data_validation import data_validation + from data_validation.result_handlers import bigquery as bqhandler + + BQ_CONN = {"source_type": "BigQuery", "project_id": project} + + GROUPED_CONFIG_COUNT_VALID = { + # BigQuery Specific Connection Config + "source_conn": BQ_CONN, + "target_conn": BQ_CONN, + # Validation Type + "type": "Column", + # Configuration Required Depending on Validator Type + "schema_name": "bigquery-public-data.new_york_citibike", + "table_name": "citibike_trips", + "aggregates": [ + { + "field_alias": "count", + # Tool will run a 'COUNT *' as the default aggregation + "source_column": None, + "target_column": None, + "type": "count", + }, + ], + } + + handler = bqhandler.BigQueryResultHandler.get_handler_for_project(project) + validator = data_validation.DataValidation( + GROUPED_CONFIG_COUNT_VALID, verbose=True, result_handler=handler + ) + validator.execute() + + gcp_project = models.Variable.get("gcp_project") + + virtualenv_task = PythonVirtualenvOperator( + task_id="dvt-virtualenv", + python_callable=validation_function, + op_args=[gcp_project], + requirements=["google-pso-data-validator"], + system_site_packages=False, + ) + +virtualenv_task diff --git a/samples/airflow_dag.py b/samples/airflow_dag.py deleted file mode 100644 index aa15682dd..000000000 --- a/samples/airflow_dag.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import airflow -import warnings - -# from data_validation import data_validation, consts, exceptions -# from data_validation.query_builder import query_builder -from airflow import DAG -from datetime import timedelta - -from data_validation.composer import operators - -BQ_CONFIG_VALID = { - # Configuration Required for All Data Soures - "source_type": "BigQuery", - # BigQuery Specific Connection Config - "config": {"project_id": os.environ["GCP_PROJECT"]}, - # Configuration Required Depending on Validator Type - "schema_name": "bigquery-public-data.new_york_citibike", - "table_name": "citibike_trips", - "partition_column": "starttime", -} - -# TODO: To use this code I would need to whitelist the MySQL instance -MYSQL_CONFIG_INVALID = { - # Configuration Required for All Data Soures - "source_type": "MySQL", - # BigQuery Specific Connection Config - "config": { - "host": "35.227.139.75", - "user": "root", - "password": "password", - "port": 3306, - "database": "guestbook", - "driver": "pymysql", - }, - # Configuration Required Depending on Validator Type - "schema_name": "guestbook", - "table_name": "entries", - "partition_column": "starttime", -} - - -default_args = { - "start_date": airflow.utils.dates.days_ago(0), - "retries": 1, - "retry_delay": timedelta(minutes=5), -} - -dag = DAG( - "data_validation_test", - default_args=default_args, - description="Test Data Validation Operators", - schedule_interval=None, - dagrun_timeout=timedelta(minutes=60), -) - - -# priority_weight has type int in Airflow DB, uses the maximum. -validate_bq_table_count = operators.DataValidationCountOperator( - task_id="validate_bq_table_count", - source_config=BQ_CONFIG_VALID, - target_config=BQ_CONFIG_VALID, - dag=dag, - # depends_on_past=False, - # priority_weight=2**31-1 -) - -validate_mysql_table_count = operators.DataValidationCountOperator( - task_id="validate_mysql_table_count", - source_config=MYSQL_CONFIG_INVALID, - target_config=MYSQL_CONFIG_INVALID, - dag=dag, - # depends_on_past=False, - # priority_weight=2**31-1 -) diff --git a/samples/tests/test_airflow_dag.py b/samples/tests/test_airflow_dag.py index ad6323103..fbb82cec3 100644 --- a/samples/tests/test_airflow_dag.py +++ b/samples/tests/test_airflow_dag.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,6 +21,6 @@ def test_dag_import(): environment. This is a recommended sanity check by the official Airflow docs: https://airflow.incubator.apache.org/tutorial.html#testing """ - from .. import airflow_dag as module + from ..airflow import dvt_airflow_dag as module airflow_test_utils.assert_has_valid_dag(module)