Skip to content

Commit

Permalink
feat: add a new DAG example to run DVT (#485)
Browse files Browse the repository at this point in the history
* feat: add a new DAG example to run DVT

* feat: Change DAG for simple count. Remove old DAG

* docs: Add section for Airflow on Installation page

* docs: Update Airflow section on Installation page
  • Loading branch information
helensilva14 committed May 24, 2022
1 parent 083de07 commit e3dd7ed
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 121 deletions.
31 changes: 1 addition & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -642,36 +642,7 @@ in the Data Validation tool, it is a simple process.
3. You are done, you can reference the data source via the config.

- Config: {"source_type": "<RefName>", ...KV Values required in Client...}

## Deploy to Composer

```
#!/bin/bash
export COMPOSER_ENV=""
export LOCATION=""
echo "Creating Composer Env: $COMPOSER_ENV"
gcloud services enable composer.googleapis.com
gcloud composer environments create $COMPOSER_ENV --location=$LOCATION --python-version=3
echo "Updating Composer Env Reqs: $COMPOSER_ENV"
# Composer builds Pandas and BigQuery for you, these should be stripped out
cat requirements.txt | grep -v pandas | grep -v google-cloud-bigquery > temp_reqs.txt
gcloud composer environments update $COMPOSER_ENV --location=$LOCATION --update-pypi-packages-from-file=temp_reqs.txt
rm temp_reqs.txt
# Deploy Package to Composer (the hacky way)
echo "Rebuilding Data Validation Package in GCS"
export GCS_BUCKET_PATH=$(gcloud composer environments describe $COMPOSER_ENV --location=$LOCATION | grep dagGcsPrefix | awk '{print $2;}')
gsutil rm -r $GCS_BUCKET_PATH/data_validation
gsutil cp -r data_validation $GCS_BUCKET_PATH/data_validation
# Deploy Test DAG to Composer
echo "Pushing Data Validation Test Operator to GCS"
gsutil cp tests/test_data_validation_operators.py $GCS_BUCKET_PATH/
```


## Contributing

Contributions are welcome. See the [contributing guide](CONTRIBUTING.md) for
Expand Down
6 changes: 6 additions & 0 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ The unit test suite can be executed using either `pytest tests/unit` or `python
If native installation is not an option for you, you can create a Docker image for this tool.

Here's an [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/docker/README.md) on how you can create a sample docker image for this tool.


## Automate using Apache Airflow
You can orchestrate DVT by running a validation as a task within an Airflow DAG.

Here's a simple [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/airflow/dvt_airflow_dag.py) on how you can execute this tool using the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator).
95 changes: 95 additions & 0 deletions samples/airflow/dvt_airflow_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
"""

"""
Example DAG for Data Validation Tool
Requirements to run the DAG:
- Airflow environment created with Public IP (Private environment disabled)
- Create an Airflow variable called 'gcp_project' with the GCP Project ID
- Create BigQuery dataset and table to store the validation results
bq mk pso_data_validator
bq mk --table \
--time_partitioning_field start_time \
--clustering_fields validation_name,run_id \
pso_data_validator.results \
results_schema.json
"""

from datetime import timedelta

import airflow
from airflow import DAG
from airflow import models
from airflow.operators.python import PythonVirtualenvOperator

default_args = {
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"dvt_example_dag",
default_args=default_args,
description="Data Validation Tool Example DAG",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
) as dag:

def validation_function(project):
from data_validation import data_validation
from data_validation.result_handlers import bigquery as bqhandler

BQ_CONN = {"source_type": "BigQuery", "project_id": project}

GROUPED_CONFIG_COUNT_VALID = {
# BigQuery Specific Connection Config
"source_conn": BQ_CONN,
"target_conn": BQ_CONN,
# Validation Type
"type": "Column",
# Configuration Required Depending on Validator Type
"schema_name": "bigquery-public-data.new_york_citibike",
"table_name": "citibike_trips",
"aggregates": [
{
"field_alias": "count",
# Tool will run a 'COUNT *' as the default aggregation
"source_column": None,
"target_column": None,
"type": "count",
},
],
}

handler = bqhandler.BigQueryResultHandler.get_handler_for_project(project)
validator = data_validation.DataValidation(
GROUPED_CONFIG_COUNT_VALID, verbose=True, result_handler=handler
)
validator.execute()

gcp_project = models.Variable.get("gcp_project")

virtualenv_task = PythonVirtualenvOperator(
task_id="dvt-virtualenv",
python_callable=validation_function,
op_args=[gcp_project],
requirements=["google-pso-data-validator"],
system_site_packages=False,
)

virtualenv_task
89 changes: 0 additions & 89 deletions samples/airflow_dag.py

This file was deleted.

4 changes: 2 additions & 2 deletions samples/tests/test_airflow_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,6 @@ def test_dag_import():
environment. This is a recommended sanity check by the official Airflow
docs: https://airflow.incubator.apache.org/tutorial.html#testing
"""
from .. import airflow_dag as module
from ..airflow import dvt_airflow_dag as module

airflow_test_utils.assert_has_valid_dag(module)

0 comments on commit e3dd7ed

Please sign in to comment.