diff --git a/README.md b/README.md index 62fe90630..93c489d46 100644 --- a/README.md +++ b/README.md @@ -277,7 +277,7 @@ page provides few examples of how this tool can be used to run custom query vali #### Custom Query Row Validations -(Note: Row hash validation is currently only supported for BigQuery, Teradata, and +(Note: Custom query row validation is currently only supported for BigQuery, Teradata, SQL Server, and Impala/Hive. Struct and array data types are not currently supported.) Below is the command syntax for row validations. In order to run row level diff --git a/data_validation/clients.py b/data_validation/clients.py index 35fd1625b..c2dedbb68 100644 --- a/data_validation/clients.py +++ b/data_validation/clients.py @@ -22,6 +22,7 @@ import ibis_bigquery import pandas import third_party.ibis.ibis_addon.datatypes +import third_party.ibis.ibis_addon.base_sqlalchemy.alchemy from google.cloud import bigquery from ibis.backends.mysql.client import MySQLClient from ibis.backends.pandas.client import PandasClient diff --git a/data_validation/query_builder/query_builder.py b/data_validation/query_builder/query_builder.py index de8a9e5d5..3044f154d 100644 --- a/data_validation/query_builder/query_builder.py +++ b/data_validation/query_builder/query_builder.py @@ -492,7 +492,10 @@ def compile(self, validation_type, table): for n in range(0, (depth_limit + 1)): table = table.mutate(self.compile_calculated_fields(table, n)) - if validation_type == consts.ROW_VALIDATION: + if ( + validation_type == consts.ROW_VALIDATION + or validation_type == consts.CUSTOM_QUERY + ): table = table.projection(self.compile_comparison_fields(table)) else: if self.comparison_fields: diff --git a/third_party/ibis/ibis_addon/base_sqlalchemy/__init__.py b/third_party/ibis/ibis_addon/base_sqlalchemy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/third_party/ibis/ibis_addon/base_sqlalchemy/alchemy.py b/third_party/ibis/ibis_addon/base_sqlalchemy/alchemy.py new file mode 100644 index 000000000..665bb2263 --- /dev/null +++ b/third_party/ibis/ibis_addon/base_sqlalchemy/alchemy.py @@ -0,0 +1,79 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy as sa +from ibis.backends.base_sqlalchemy.alchemy import ( + _to_sqla_type, + AlchemyTable, + _AlchemyTableSet, +) +import ibis.expr.schema as sch +import ibis.expr.operations as ops + + +def _schema_to_sqlalchemy_columns(schema: sch.Schema): + return [sa.column(n, _to_sqla_type(t)) for n, t in schema.items()] + + +def _format_table_new(self, expr): + ctx = self.context + ref_expr = expr + op = ref_op = expr.op() + + if isinstance(op, ops.SelfReference): + ref_expr = op.table + ref_op = ref_expr.op() + + alias = ctx.get_ref(expr) + if isinstance(ref_op, AlchemyTable): + result = ref_op.sqla_table + elif isinstance(ref_op, ops.SQLQueryResult): + columns = _schema_to_sqlalchemy_columns(ref_op.schema) + result = sa.text(ref_op.query).columns(*columns) + elif isinstance(ref_op, ops.UnboundTable): + # use SQLAlchemy's TableClause and ColumnClause for unbound tables + schema = ref_op.schema + result = sa.table( + ref_op.name if ref_op.name is not None else ctx.get_ref(expr), + *( + sa.column(n, _to_sqla_type(t)) + for n, t in zip(schema.names, schema.types) + ), + ) + else: + # A subquery + if ctx.is_extracted(ref_expr): + # Was put elsewhere, e.g. WITH block, we just need to grab + # its alias + alias = ctx.get_ref(expr) + + # hack + if isinstance(op, ops.SelfReference): + + table = ctx.get_table(ref_expr) + self_ref = table.alias(alias) + ctx.set_table(expr, self_ref) + return self_ref + else: + return ctx.get_table(expr) + + result = ctx.get_compiled_expr(expr) + alias = ctx.get_ref(expr) + + result = result.alias(alias) + ctx.set_table(expr, result) + return result + + +_AlchemyTableSet._format_table = _format_table_new