Skip to content

Commit

Permalink
fix: Impala strings/objects now return None instead of NaN (#406)
Browse files Browse the repository at this point in the history
* fix: strings/objects now return None instead of NaN

* Previously Impala strings, chars and other objects filled a dataframe with NaN
instead of NULL which caused mismatch issues when comparing with BigQuery dataframes.
* Moved _column_batches_to_dataframe and _chunks_to_pandas_array functions from the ibis.backends.impala.client into api.py to override default functionality.
* refactor: import instead of rewrite _HS2_TTypeId_to_dtype
  • Loading branch information
ngdav committed Mar 25, 2022
1 parent 7a218d2 commit 9d3c5ec
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion third_party/ibis/ibis_impala/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
from ibis.backends.base_sql import fixed_arity
from ibis.backends.impala import connect, udf
from ibis.backends.impala.compiler import rewrites
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _column_batches_to_dataframe
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _HS2_TTypeId_to_dtype
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import numpy as np
import pandas as pd

_impala_to_ibis_type = udf._impala_to_ibis_type

Expand Down Expand Up @@ -105,6 +107,65 @@ def _fetch(self, cursor):
df = _column_batches_to_dataframe(names, batches)
return df

def _column_batches_to_dataframe(names, batches):
cols = {}
for name, chunks in zip(names, zip(*[b.columns for b in batches])):
cols[name] = _chunks_to_pandas_array(chunks)
return pd.DataFrame(cols, columns=names)


def _chunks_to_pandas_array(chunks):
total_length = 0
have_nulls = False
for c in chunks:
total_length += len(c)
have_nulls = have_nulls or c.nulls.any()

type_ = chunks[0].data_type
numpy_type = _HS2_TTypeId_to_dtype[type_]

def fill_nonnull(target, chunks):
pos = 0
for c in chunks:
target[pos : pos + len(c)] = c.values
pos += len(c.values)

def fill(target, chunks, na_rep):
pos = 0
for c in chunks:
nulls = c.nulls.copy()
nulls.bytereverse()
bits = np.frombuffer(nulls.tobytes(), dtype='u1')
mask = np.unpackbits(bits).view(np.bool_)

k = len(c)

dest = target[pos : pos + k]
dest[:] = c.values
dest[mask[:k]] = na_rep

pos += k

if have_nulls:
if numpy_type in ('bool', 'datetime64[ns]'):
target = np.empty(total_length, dtype='O')
na_rep = np.nan
elif numpy_type.startswith('int'):
target = np.empty(total_length, dtype='f8')
na_rep = np.nan
elif numpy_type in ('object'):
target = np.empty(total_length, dtype=object)
na_rep = None
else:
target = np.empty(total_length, dtype=numpy_type)
na_rep = np.nan

fill(target, chunks, na_rep)
else:
target = np.empty(total_length, dtype=numpy_type)
fill_nonnull(target, chunks)

return target

@rewrites(ops.IfNull)
def _if_null(expr):
Expand Down

0 comments on commit 9d3c5ec

Please sign in to comment.