From 9d3c5ecf1babae2c811a30d0820701b124ae1c50 Mon Sep 17 00:00:00 2001 From: ngdav <52801477+ngdav@users.noreply.github.com> Date: Fri, 25 Mar 2022 15:04:38 -0400 Subject: [PATCH] fix: Impala strings/objects now return None instead of NaN (#406) * fix: strings/objects now return None instead of NaN * Previously Impala strings, chars and other objects filled a dataframe with NaN instead of NULL which caused mismatch issues when comparing with BigQuery dataframes. * Moved _column_batches_to_dataframe and _chunks_to_pandas_array functions from the ibis.backends.impala.client into api.py to override default functionality. * refactor: import instead of rewrite _HS2_TTypeId_to_dtype --- third_party/ibis/ibis_impala/api.py | 63 ++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/third_party/ibis/ibis_impala/api.py b/third_party/ibis/ibis_impala/api.py index d471e4c44..4e89c3345 100644 --- a/third_party/ibis/ibis_impala/api.py +++ b/third_party/ibis/ibis_impala/api.py @@ -15,10 +15,12 @@ from ibis.backends.base_sql import fixed_arity from ibis.backends.impala import connect, udf from ibis.backends.impala.compiler import rewrites -from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _column_batches_to_dataframe +from ibis.backends.impala.client import ImpalaClient, ImpalaQuery, _HS2_TTypeId_to_dtype import ibis.expr.datatypes as dt import ibis.expr.operations as ops import ibis.expr.schema as sch +import numpy as np +import pandas as pd _impala_to_ibis_type = udf._impala_to_ibis_type @@ -105,6 +107,65 @@ def _fetch(self, cursor): df = _column_batches_to_dataframe(names, batches) return df +def _column_batches_to_dataframe(names, batches): + cols = {} + for name, chunks in zip(names, zip(*[b.columns for b in batches])): + cols[name] = _chunks_to_pandas_array(chunks) + return pd.DataFrame(cols, columns=names) + + +def _chunks_to_pandas_array(chunks): + total_length = 0 + have_nulls = False + for c in chunks: + total_length += len(c) + have_nulls = have_nulls or c.nulls.any() + + type_ = chunks[0].data_type + numpy_type = _HS2_TTypeId_to_dtype[type_] + + def fill_nonnull(target, chunks): + pos = 0 + for c in chunks: + target[pos : pos + len(c)] = c.values + pos += len(c.values) + + def fill(target, chunks, na_rep): + pos = 0 + for c in chunks: + nulls = c.nulls.copy() + nulls.bytereverse() + bits = np.frombuffer(nulls.tobytes(), dtype='u1') + mask = np.unpackbits(bits).view(np.bool_) + + k = len(c) + + dest = target[pos : pos + k] + dest[:] = c.values + dest[mask[:k]] = na_rep + + pos += k + + if have_nulls: + if numpy_type in ('bool', 'datetime64[ns]'): + target = np.empty(total_length, dtype='O') + na_rep = np.nan + elif numpy_type.startswith('int'): + target = np.empty(total_length, dtype='f8') + na_rep = np.nan + elif numpy_type in ('object'): + target = np.empty(total_length, dtype=object) + na_rep = None + else: + target = np.empty(total_length, dtype=numpy_type) + na_rep = np.nan + + fill(target, chunks, na_rep) + else: + target = np.empty(total_length, dtype=numpy_type) + fill_nonnull(target, chunks) + + return target @rewrites(ops.IfNull) def _if_null(expr):