Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a new DAG example to run DVT #485

Merged
merged 4 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 1 addition & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -638,36 +638,7 @@ in the Data Validation tool, it is a simple process.
3. You are done, you can reference the data source via the config.

- Config: {"source_type": "<RefName>", ...KV Values required in Client...}

## Deploy to Composer

```
#!/bin/bash

export COMPOSER_ENV=""
export LOCATION=""

echo "Creating Composer Env: $COMPOSER_ENV"
gcloud services enable composer.googleapis.com
gcloud composer environments create $COMPOSER_ENV --location=$LOCATION --python-version=3

echo "Updating Composer Env Reqs: $COMPOSER_ENV"
# Composer builds Pandas and BigQuery for you, these should be stripped out
cat requirements.txt | grep -v pandas | grep -v google-cloud-bigquery > temp_reqs.txt
gcloud composer environments update $COMPOSER_ENV --location=$LOCATION --update-pypi-packages-from-file=temp_reqs.txt
rm temp_reqs.txt

# Deploy Package to Composer (the hacky way)
echo "Rebuilding Data Validation Package in GCS"
export GCS_BUCKET_PATH=$(gcloud composer environments describe $COMPOSER_ENV --location=$LOCATION | grep dagGcsPrefix | awk '{print $2;}')
gsutil rm -r $GCS_BUCKET_PATH/data_validation
gsutil cp -r data_validation $GCS_BUCKET_PATH/data_validation

# Deploy Test DAG to Composer
echo "Pushing Data Validation Test Operator to GCS"
gsutil cp tests/test_data_validation_operators.py $GCS_BUCKET_PATH/
```


## Contributing

Contributions are welcome. See the [contributing guide](CONTRIBUTING.md) for
Expand Down
6 changes: 6 additions & 0 deletions docs/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ The unit test suite can be executed using either `pytest tests/unit` or `python
If native installation is not an option for you, you can create a Docker image for this tool.

Here's an [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/docker/README.md) on how you can create a sample docker image for this tool.


## Automate using Apache Airflow
You can orchestrate DVT by running a validation as a task within an Airflow DAG.

Here's a simple [example](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/samples/airflow/dvt_airflow_dag.py) on how you can execute this tool using the [PythonVirtualenvOperator](https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator).
95 changes: 95 additions & 0 deletions samples/airflow/dvt_airflow_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
"""

"""
Example DAG for Data Validation Tool

Requirements to run the DAG:
- Airflow environment created with Public IP (Private environment disabled)
- Create an Airflow variable called 'gcp_project' with the GCP Project ID
- Create BigQuery dataset and table to store the validation results
bq mk pso_data_validator
bq mk --table \
--time_partitioning_field start_time \
--clustering_fields validation_name,run_id \
pso_data_validator.results \
results_schema.json
"""

from datetime import timedelta

import airflow
from airflow import DAG
from airflow import models
from airflow.operators.python import PythonVirtualenvOperator

default_args = {
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
"dvt_example_dag",
default_args=default_args,
description="Data Validation Tool Example DAG",
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
) as dag:

def validation_function(project):
from data_validation import data_validation
from data_validation.result_handlers import bigquery as bqhandler

BQ_CONN = {"source_type": "BigQuery", "project_id": project}

GROUPED_CONFIG_COUNT_VALID = {
# BigQuery Specific Connection Config
"source_conn": BQ_CONN,
"target_conn": BQ_CONN,
# Validation Type
"type": "Column",
# Configuration Required Depending on Validator Type
"schema_name": "bigquery-public-data.new_york_citibike",
"table_name": "citibike_trips",
"aggregates": [
{
"field_alias": "count",
# Tool will run a 'COUNT *' as the default aggregation
"source_column": None,
"target_column": None,
"type": "count",
},
],
}

handler = bqhandler.BigQueryResultHandler.get_handler_for_project(project)
validator = data_validation.DataValidation(
GROUPED_CONFIG_COUNT_VALID, verbose=True, result_handler=handler
)
validator.execute()

gcp_project = models.Variable.get("gcp_project")

virtualenv_task = PythonVirtualenvOperator(
task_id="dvt-virtualenv",
python_callable=validation_function,
op_args=[gcp_project],
requirements=["google-pso-data-validator"],
system_site_packages=False,
)

virtualenv_task
89 changes: 0 additions & 89 deletions samples/airflow_dag.py

This file was deleted.

4 changes: 2 additions & 2 deletions samples/tests/test_airflow_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,6 @@ def test_dag_import():
environment. This is a recommended sanity check by the official Airflow
docs: https://airflow.incubator.apache.org/tutorial.html#testing
"""
from .. import airflow_dag as module
from ..airflow import dvt_airflow_dag as module

airflow_test_utils.assert_has_valid_dag(module)