Skip to content

Commit

Permalink
feat: Hive hash function support (#392)
Browse files Browse the repository at this point in the history
* adding addons for impala hive hashing functions

* fix: import fixed_arity

* move logic to ibis_addon

* replacing isnull with nvl

* adding nvl function

* test FillNa

* missing import

* updating t0 prefix to column names



Co-authored-by: Mike Hilton <[email protected]>
  • Loading branch information
2 people authored and ngdav committed Mar 16, 2022
1 parent c7a22ce commit 24d87bb
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
6 changes: 4 additions & 2 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ Please note that for Group By validations, the following property must be set in

`set hive:hive.groupby.orderby.position.alias=true`

If you are running Hive on Dataproc, you will also need to run
`pip install ibis-framework[impala]`
If you are running Hive on Dataproc, you will also need to run
`pip install ibis-framework[impala]`

Currently only INT, BIGINT, FLOAT, and DOUBLE data types are supported for Hive aggregation.

```
{
Expand Down
2 changes: 1 addition & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-pub

#### Run a checksum validation for all rows
````shell script
data-validation validate row -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --primary-keys station_id --hash '*'
data-validation validate row -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --primary-keys station_id --hash '*'
````

#### Store results in a BigQuery table
Expand Down
51 changes: 32 additions & 19 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@
import sqlalchemy

import ibis.expr.api
from ibis_bigquery.compiler import (
reduction as bq_reduction,
BigQueryExprTranslator
)
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
import ibis.expr.datatypes as dt
from ibis.expr.operations import (
Arg, Comparison, Reduction, ValueOp
)
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
import ibis.expr.rules as rlz
from ibis.expr.types import (
BinaryValue, IntegerColumn, StringValue
)
from ibis.expr.types import BinaryValue, IntegerColumn, StringValue
from ibis.backends.impala.compiler import ImpalaExprTranslator
from ibis.backends.pandas import client as _pandas_client
from ibis.backends.base_sqlalchemy.alchemy import AlchemyExprTranslator
Expand All @@ -56,27 +49,29 @@ class BitXor(Reduction):

arg = Arg(rlz.column(rlz.integer))
where = Arg(rlz.boolean, default=None)
output_type = rlz.scalar_like('arg')
output_type = rlz.scalar_like("arg")


class Hash(ValueOp):
arg = Arg(rlz.any)
how = Arg(rlz.isin({'fnv', 'farm_fingerprint'}))
output_type = rlz.shape_like('arg', dt.int64)
how = Arg(rlz.isin({"fnv", "farm_fingerprint"}))
output_type = rlz.shape_like("arg", dt.int64)


class HashBytes(ValueOp):
arg = Arg(rlz.one_of([rlz.value(dt.string), rlz.value(dt.binary)]))
how = Arg(rlz.isin({'sha256', 'farm_fingerprint'}))
output_type = rlz.shape_like('arg', 'binary')
how = Arg(rlz.isin({"sha256", "farm_fingerprint"}))
output_type = rlz.shape_like("arg", "binary")


class RawSQL(Comparison):
pass


def compile_hash(numeric_value, how):
return Hash(numeric_value, how=how).to_expr()


def compile_hash(binary_value, how):
return Hash(binary_value, how=how).to_expr()

Expand All @@ -87,15 +82,16 @@ def format_hash_bigquery(translator, expr):

arg_formatted = translator.translate(arg)

if how == 'farm_fingerprint':
return f'farm_fingerprint({arg_formatted})'
if how == "farm_fingerprint":
return f"farm_fingerprint({arg_formatted})"
else:
raise NotImplementedError(how)


def compile_hashbytes(binary_value, how):
return HashBytes(binary_value, how=how).to_expr()


def format_hash_bigquery(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -104,6 +100,7 @@ def format_hash_bigquery(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_bigquery(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -114,6 +111,7 @@ def format_hashbytes_bigquery(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_teradata(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -126,6 +124,18 @@ def format_hashbytes_teradata(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_hive(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
if how == "sha256":
return f"sha2({compiled_arg}, 256)"
elif how == "md5":
return f"md5({compiled_arg})"
else:
raise ValueError(f"unexpected value for 'how': {how}")


def compile_raw_sql(table, sql):
op = RawSQL(table[table.columns[0]].cast(dt.string), ibis.literal(sql))
return op.to_expr()
Expand All @@ -136,23 +146,26 @@ def format_raw_sql(translator, expr):
rand_col, raw_sql = op.args
return raw_sql.op().args[0]


def sa_format_raw_sql(translator, expr):
op = expr.op()
rand_col, raw_sql = op.args
return sqlalchemy.text(raw_sql.op().args[0])


_pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64
IntegerColumn.bit_xor = ibis.expr.api._agg_function('bit_xor', BitXor, True)
IntegerColumn.bit_xor = ibis.expr.api._agg_function("bit_xor", BitXor, True)
BinaryValue.hash = compile_hash
StringValue.hash = compile_hash
BinaryValue.hashbytes = compile_hashbytes
StringValue.hashbytes = compile_hashbytes
BigQueryExprTranslator._registry[BitXor] = bq_reduction('BIT_XOR')
BigQueryExprTranslator._registry[BitXor] = bq_reduction("BIT_XOR")
BigQueryExprTranslator._registry[Hash] = format_hash_bigquery
BigQueryExprTranslator._registry[HashBytes] = format_hashbytes_bigquery
AlchemyExprTranslator._registry[RawSQL] = format_raw_sql
BigQueryExprTranslator._registry[RawSQL] = format_raw_sql
ImpalaExprTranslator._registry[RawSQL] = format_raw_sql
ImpalaExprTranslator._registry[HashBytes] = format_hashbytes_hive
OracleExprTranslator._registry[RawSQL] = sa_format_raw_sql
TeradataExprTranslator._registry[RawSQL] = format_raw_sql
TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata
33 changes: 30 additions & 3 deletions third_party/ibis/ibis_impala/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ibis.backends.impala import connect
from ibis.backends.impala import udf
from ibis.backends.impala.client import ImpalaClient
from ibis.backends.base_sql import fixed_arity
from ibis.backends.impala import connect, udf
from ibis.backends.impala.compiler import rewrites
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _column_batches_to_dataframe
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch

_impala_to_ibis_type = udf._impala_to_ibis_type
Expand Down Expand Up @@ -95,5 +97,30 @@ def get_schema(self, table_name, database=None):
return sch.Schema(names, ibis_types)


def _fetch(self, cursor):
batches = cursor.fetchall(columnar=True)
names = []
for x in cursor.description:
name = x[0]
if name.startswith('t0.'):
name = name[3:]
names.append(name)
df = _column_batches_to_dataframe(names, batches)

if self.expr is not None:
# in case of metadata queries there is no expr and
# self.schema() would raise an exception
return self.schema().apply_to(df)

return df


@rewrites(ops.IfNull)
def _if_null(expr):
arg, fill_value = expr.op().args
return arg.coalesce(fill_value)


udf.parse_type = parse_type
ImpalaClient.get_schema = get_schema
ImpalaQuery._fetch = _fetch

0 comments on commit 24d87bb

Please sign in to comment.