Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Impala strings/objects now return None instead of NaN #406

Merged
merged 2 commits into from
Mar 25, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion third_party/ibis/ibis_impala/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
from ibis.backends.base_sql import fixed_arity
from ibis.backends.impala import connect, udf
from ibis.backends.impala.compiler import rewrites
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _column_batches_to_dataframe
from ibis.backends.impala.client import ImpalaClient, ImpalaQuery
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import numpy as np
import pandas as pd

_impala_to_ibis_type = udf._impala_to_ibis_type

Expand Down Expand Up @@ -105,6 +107,81 @@ def _fetch(self, cursor):
df = _column_batches_to_dataframe(names, batches)
return df

def _column_batches_to_dataframe(names, batches):
cols = {}
for name, chunks in zip(names, zip(*[b.columns for b in batches])):
cols[name] = _chunks_to_pandas_array(chunks)
return pd.DataFrame(cols, columns=names)


def _chunks_to_pandas_array(chunks):
total_length = 0
have_nulls = False
for c in chunks:
total_length += len(c)
have_nulls = have_nulls or c.nulls.any()

type_ = chunks[0].data_type
numpy_type = _HS2_TTypeId_to_dtype[type_]

def fill_nonnull(target, chunks):
ngdav marked this conversation as resolved.
Show resolved Hide resolved
pos = 0
for c in chunks:
target[pos : pos + len(c)] = c.values
pos += len(c.values)

def fill(target, chunks, na_rep):
pos = 0
for c in chunks:
nulls = c.nulls.copy()
nulls.bytereverse()
bits = np.frombuffer(nulls.tobytes(), dtype='u1')
mask = np.unpackbits(bits).view(np.bool_)

k = len(c)

dest = target[pos : pos + k]
dest[:] = c.values
dest[mask[:k]] = na_rep

pos += k

if have_nulls:
if numpy_type in ('bool', 'datetime64[ns]'):
target = np.empty(total_length, dtype='O')
na_rep = np.nan
elif numpy_type.startswith('int'):
target = np.empty(total_length, dtype='f8')
na_rep = np.nan
elif numpy_type in ('object'):
ngdav marked this conversation as resolved.
Show resolved Hide resolved
target = np.empty(total_length, dtype=object)
na_rep = None
else:
target = np.empty(total_length, dtype=numpy_type)
na_rep = np.nan

fill(target, chunks, na_rep)
else:
target = np.empty(total_length, dtype=numpy_type)
fill_nonnull(target, chunks)

return target

_HS2_TTypeId_to_dtype = {
'BOOLEAN': 'bool',
'TINYINT': 'int8',
'SMALLINT': 'int16',
'INT': 'int32',
'BIGINT': 'int64',
'TIMESTAMP': 'datetime64[ns]',
'FLOAT': 'float32',
'DOUBLE': 'float64',
'STRING': 'object',
'DECIMAL': 'object',
'BINARY': 'object',
'VARCHAR': 'object',
'CHAR': 'object',
}

@rewrites(ops.IfNull)
def _if_null(expr):
Expand Down