Skip to content

Commit

Permalink
feat: SQL Server custom query support (#640)
Browse files Browse the repository at this point in the history
* feat: support for MSSQL custom query validation, fix to ensure custom query only brings necessary columns into memory

* docs

* remove function annotation
  • Loading branch information
nehanene15 committed Dec 6, 2022
1 parent 14b506b commit 98ab010
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ page provides few examples of how this tool can be used to run custom query vali

#### Custom Query Row Validations

(Note: Row hash validation is currently only supported for BigQuery, Teradata, and
(Note: Custom query row validation is currently only supported for BigQuery, Teradata, SQL Server, and
Impala/Hive. Struct and array data types are not currently supported.)

Below is the command syntax for row validations. In order to run row level
Expand Down
1 change: 1 addition & 0 deletions data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import ibis_bigquery
import pandas
import third_party.ibis.ibis_addon.datatypes
import third_party.ibis.ibis_addon.base_sqlalchemy.alchemy
from google.cloud import bigquery
from ibis.backends.mysql.client import MySQLClient
from ibis.backends.pandas.client import PandasClient
Expand Down
5 changes: 4 additions & 1 deletion data_validation/query_builder/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,10 @@ def compile(self, validation_type, table):
for n in range(0, (depth_limit + 1)):
table = table.mutate(self.compile_calculated_fields(table, n))

if validation_type == consts.ROW_VALIDATION:
if (
validation_type == consts.ROW_VALIDATION
or validation_type == consts.CUSTOM_QUERY
):
table = table.projection(self.compile_comparison_fields(table))
else:
if self.comparison_fields:
Expand Down
Empty file.
79 changes: 79 additions & 0 deletions third_party/ibis/ibis_addon/base_sqlalchemy/alchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlalchemy as sa
from ibis.backends.base_sqlalchemy.alchemy import (
_to_sqla_type,
AlchemyTable,
_AlchemyTableSet,
)
import ibis.expr.schema as sch
import ibis.expr.operations as ops


def _schema_to_sqlalchemy_columns(schema: sch.Schema):
return [sa.column(n, _to_sqla_type(t)) for n, t in schema.items()]


def _format_table_new(self, expr):
ctx = self.context
ref_expr = expr
op = ref_op = expr.op()

if isinstance(op, ops.SelfReference):
ref_expr = op.table
ref_op = ref_expr.op()

alias = ctx.get_ref(expr)
if isinstance(ref_op, AlchemyTable):
result = ref_op.sqla_table
elif isinstance(ref_op, ops.SQLQueryResult):
columns = _schema_to_sqlalchemy_columns(ref_op.schema)
result = sa.text(ref_op.query).columns(*columns)
elif isinstance(ref_op, ops.UnboundTable):
# use SQLAlchemy's TableClause and ColumnClause for unbound tables
schema = ref_op.schema
result = sa.table(
ref_op.name if ref_op.name is not None else ctx.get_ref(expr),
*(
sa.column(n, _to_sqla_type(t))
for n, t in zip(schema.names, schema.types)
),
)
else:
# A subquery
if ctx.is_extracted(ref_expr):
# Was put elsewhere, e.g. WITH block, we just need to grab
# its alias
alias = ctx.get_ref(expr)

# hack
if isinstance(op, ops.SelfReference):

table = ctx.get_table(ref_expr)
self_ref = table.alias(alias)
ctx.set_table(expr, self_ref)
return self_ref
else:
return ctx.get_table(expr)

result = ctx.get_compiled_expr(expr)
alias = ctx.get_ref(expr)

result = result.alias(alias)
ctx.set_table(expr, result)
return result


_AlchemyTableSet._format_table = _format_table_new

0 comments on commit 98ab010

Please sign in to comment.