Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Hive hash function support #392

Merged
merged 51 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
a6d5521
adding addons for impala hive hashing functions
renzokuken Feb 25, 2022
82941c2
fix: adding nvl function for hive
nehanene15 Mar 3, 2022
9444f5d
fix: import ops
nehanene15 Mar 3, 2022
0b8fd47
fix: import fixed_arity
nehanene15 Mar 3, 2022
a5e218c
merge develop
nehanene15 Mar 3, 2022
babb571
override class
nehanene15 Mar 3, 2022
e98b7f5
override class
nehanene15 Mar 3, 2022
72f1675
override class
nehanene15 Mar 3, 2022
2d2c5ed
override class
nehanene15 Mar 3, 2022
f844b0c
move logic to ibis_addon
nehanene15 Mar 3, 2022
8f1b4ee
replacing isnull with nvl
nehanene15 Mar 3, 2022
fb81cba
adding ifnull
nehanene15 Mar 3, 2022
4d440a4
adding isnull
nehanene15 Mar 3, 2022
42fd92f
adding unaryop import
nehanene15 Mar 3, 2022
a8532e1
adding nvl function
nehanene15 Mar 3, 2022
e3cfefc
trying FillNa
nehanene15 Mar 4, 2022
b1fe078
test FillNa
nehanene15 Mar 4, 2022
6c8be5c
test FillNa
nehanene15 Mar 4, 2022
c4acd0b
missing import
nehanene15 Mar 4, 2022
8927348
debug
nehanene15 Mar 4, 2022
1e38473
debug
nehanene15 Mar 4, 2022
0634c8c
debug
nehanene15 Mar 4, 2022
3d43857
debug
nehanene15 Mar 4, 2022
55ed779
debug
nehanene15 Mar 4, 2022
45586c4
debug
nehanene15 Mar 4, 2022
ef5655b
debug
nehanene15 Mar 4, 2022
6213e00
debug
nehanene15 Mar 4, 2022
68221b9
debug
nehanene15 Mar 4, 2022
6f550b7
debug
nehanene15 Mar 4, 2022
5081748
debug
nehanene15 Mar 4, 2022
cc8329c
debug
nehanene15 Mar 7, 2022
fc9b6b9
debug
nehanene15 Mar 8, 2022
27ebe06
debug
nehanene15 Mar 8, 2022
179960d
debug
nehanene15 Mar 8, 2022
af39f87
debug
nehanene15 Mar 8, 2022
c965dcc
debug
nehanene15 Mar 8, 2022
f0bee9c
debug
nehanene15 Mar 8, 2022
e4d3236
debug
nehanene15 Mar 8, 2022
1ac5d90
debug
nehanene15 Mar 8, 2022
787a6be
debug
nehanene15 Mar 8, 2022
499c585
debug
nehanene15 Mar 8, 2022
2d31103
debug
nehanene15 Mar 8, 2022
de5c097
debug
nehanene15 Mar 9, 2022
11ac6d2
clean up code
nehanene15 Mar 14, 2022
5fddb96
using rewrites decorator
nehanene15 Mar 14, 2022
9a4d7c6
using rewrites decorator
nehanene15 Mar 14, 2022
11cbcec
clean up
nehanene15 Mar 14, 2022
e9fd589
debug
nehanene15 Mar 14, 2022
aa37c3c
revert change
nehanene15 Mar 14, 2022
5a2f351
updating t0 prefix to column names
nehanene15 Mar 15, 2022
5315d45
typo
nehanene15 Mar 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ Please note that for Group By validations, the following property must be set in

`set hive:hive.groupby.orderby.position.alias=true`

If you are running Hive on Dataproc, you will also need to run
`pip install ibis-framework[impala]`
If you are running Hive on Dataproc, you will also need to run
`pip install ibis-framework[impala]`

Currently only INT, BIGINT, FLOAT, and DOUBLE data types are supported for Hive aggregation.

```
{
Expand Down
2 changes: 1 addition & 1 deletion docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ data-validation validate column -sc my_bq_conn -tc my_bq_conn -tbls bigquery-pub

#### Run a checksum validation for all rows
````shell script
data-validation validate row -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_trips --primary-keys station_id --hash '*'
data-validation validate row -sc my_bq_conn -tc my_bq_conn -tbls bigquery-public-data.new_york_citibike.citibike_stations --primary-keys station_id --hash '*'
````

#### Store results in a BigQuery table
Expand Down
51 changes: 32 additions & 19 deletions third_party/ibis/ibis_addon/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@
import sqlalchemy

import ibis.expr.api
from ibis_bigquery.compiler import (
reduction as bq_reduction,
BigQueryExprTranslator
)
from ibis_bigquery.compiler import reduction as bq_reduction, BigQueryExprTranslator
import ibis.expr.datatypes as dt
from ibis.expr.operations import (
Arg, Comparison, Reduction, ValueOp
)
from ibis.expr.operations import Arg, Comparison, Reduction, ValueOp
import ibis.expr.rules as rlz
from ibis.expr.types import (
BinaryValue, IntegerColumn, StringValue
)
from ibis.expr.types import BinaryValue, IntegerColumn, StringValue
from ibis.backends.impala.compiler import ImpalaExprTranslator
from ibis.backends.pandas import client as _pandas_client
from ibis.backends.base_sqlalchemy.alchemy import AlchemyExprTranslator
Expand All @@ -56,27 +49,29 @@ class BitXor(Reduction):

arg = Arg(rlz.column(rlz.integer))
where = Arg(rlz.boolean, default=None)
output_type = rlz.scalar_like('arg')
output_type = rlz.scalar_like("arg")


class Hash(ValueOp):
arg = Arg(rlz.any)
how = Arg(rlz.isin({'fnv', 'farm_fingerprint'}))
output_type = rlz.shape_like('arg', dt.int64)
how = Arg(rlz.isin({"fnv", "farm_fingerprint"}))
output_type = rlz.shape_like("arg", dt.int64)


class HashBytes(ValueOp):
arg = Arg(rlz.one_of([rlz.value(dt.string), rlz.value(dt.binary)]))
how = Arg(rlz.isin({'sha256', 'farm_fingerprint'}))
output_type = rlz.shape_like('arg', 'binary')
how = Arg(rlz.isin({"sha256", "farm_fingerprint"}))
output_type = rlz.shape_like("arg", "binary")


class RawSQL(Comparison):
pass


def compile_hash(numeric_value, how):
return Hash(numeric_value, how=how).to_expr()


def compile_hash(binary_value, how):
return Hash(binary_value, how=how).to_expr()

Expand All @@ -87,15 +82,16 @@ def format_hash_bigquery(translator, expr):

arg_formatted = translator.translate(arg)

if how == 'farm_fingerprint':
return f'farm_fingerprint({arg_formatted})'
if how == "farm_fingerprint":
return f"farm_fingerprint({arg_formatted})"
else:
raise NotImplementedError(how)


def compile_hashbytes(binary_value, how):
return HashBytes(binary_value, how=how).to_expr()


def format_hash_bigquery(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -104,6 +100,7 @@ def format_hash_bigquery(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_bigquery(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -114,6 +111,7 @@ def format_hashbytes_bigquery(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_teradata(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
Expand All @@ -126,6 +124,18 @@ def format_hashbytes_teradata(translator, expr):
else:
raise ValueError(f"unexpected value for 'how': {how}")


def format_hashbytes_hive(translator, expr):
arg, how = expr.op().args
compiled_arg = translator.translate(arg)
if how == "sha256":
return f"sha2({compiled_arg}, 256)"
elif how == "md5":
return f"md5({compiled_arg})"
else:
raise ValueError(f"unexpected value for 'how': {how}")


def compile_raw_sql(table, sql):
op = RawSQL(table[table.columns[0]].cast(dt.string), ibis.literal(sql))
return op.to_expr()
Expand All @@ -136,23 +146,26 @@ def format_raw_sql(translator, expr):
rand_col, raw_sql = op.args
return raw_sql.op().args[0]


def sa_format_raw_sql(translator, expr):
op = expr.op()
rand_col, raw_sql = op.args
return sqlalchemy.text(raw_sql.op().args[0])


_pandas_client._inferable_pandas_dtypes["floating"] = _pandas_client.dt.float64
IntegerColumn.bit_xor = ibis.expr.api._agg_function('bit_xor', BitXor, True)
IntegerColumn.bit_xor = ibis.expr.api._agg_function("bit_xor", BitXor, True)
BinaryValue.hash = compile_hash
StringValue.hash = compile_hash
BinaryValue.hashbytes = compile_hashbytes
StringValue.hashbytes = compile_hashbytes
BigQueryExprTranslator._registry[BitXor] = bq_reduction('BIT_XOR')
BigQueryExprTranslator._registry[BitXor] = bq_reduction("BIT_XOR")
BigQueryExprTranslator._registry[Hash] = format_hash_bigquery
BigQueryExprTranslator._registry[HashBytes] = format_hashbytes_bigquery
AlchemyExprTranslator._registry[RawSQL] = format_raw_sql
BigQueryExprTranslator._registry[RawSQL] = format_raw_sql
ImpalaExprTranslator._registry[RawSQL] = format_raw_sql
ImpalaExprTranslator._registry[HashBytes] = format_hashbytes_hive
OracleExprTranslator._registry[RawSQL] = sa_format_raw_sql
TeradataExprTranslator._registry[RawSQL] = format_raw_sql
TeradataExprTranslator._registry[HashBytes] = format_hashbytes_teradata
33 changes: 30 additions & 3 deletions third_party/ibis/ibis_impala/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ibis.backends.impala import connect
from ibis.backends.impala import udf
from ibis.backends.impala.client import ImpalaClient
from ibis.backends.base_sql import fixed_arity
from ibis.backends.impala import connect, udf
from ibis.backends.impala.compiler import rewrites
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _column_batches_to_dataframe
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch

_impala_to_ibis_type = udf._impala_to_ibis_type
Expand Down Expand Up @@ -95,5 +97,30 @@ def get_schema(self, table_name, database=None):
return sch.Schema(names, ibis_types)


def _fetch(self, cursor):
batches = cursor.fetchall(columnar=True)
names = []
for x in cursor.description:
name = x[0]
if name.startswith('t0.'):
name = name[3:]
names.append(name)
df = _column_batches_to_dataframe(names, batches)

if self.expr is not None:
# in case of metadata queries there is no expr and
# self.schema() would raise an exception
return self.schema().apply_to(df)

return df


@rewrites(ops.IfNull)
def _if_null(expr):
arg, fill_value = expr.op().args
return arg.coalesce(fill_value)


udf.parse_type = parse_type
ImpalaClient.get_schema = get_schema
ImpalaQuery._fetch = _fetch