Skip to content

Commit

Permalink
fix: Spanner generate-partition to use BQ dialect (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
nehanene15 committed Dec 5, 2023
1 parent 4d21d65 commit f3cc565
Showing 1 changed file with 35 additions and 10 deletions.
45 changes: 35 additions & 10 deletions data_validation/partition_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,23 @@ def partition_configs(self) -> None:
self._store_partitions(yaml_configs_list)

@staticmethod
def _extract_where(x) -> str:
def _extract_where(table_expr, client) -> str:
"""Given a ibis table expression with a filter (i.e. WHERE) clause, this function extracts the where clause
in plain text
Returns:
String with the where condition
"""
return re.sub(r"\s\s+", " ", ibis.to_sql(x).sql.split("WHERE")[1]).replace(
"t0.", ""
)
if client.name == "spanner":
# As per Issue #1059, use the BQ dialect for Spanner as they have the same query syntax
return re.sub(
r"\s\s+",
" ",
ibis.to_sql(table_expr, dialect="bigquery").sql.split("WHERE")[1],
).replace("t0.", "")
return re.sub(
r"\s\s+", " ", ibis.to_sql(table_expr).sql.split("WHERE")[1]
).replace("t0.", "")

def _get_partition_key_filters(self) -> List[List[List[str]]]:
"""Generate where clauses for each partition for each table pair. We have to create separate where for the source and
Expand Down Expand Up @@ -221,10 +228,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]:
first_elements[1, : len(self.primary_keys)],
)
source_where_list.append(
self._extract_where(source_table.filter(filter_source_clause))
self._extract_where(
source_table.filter(filter_source_clause),
config_manager.source_client,
)
)
target_where_list.append(
self._extract_where(target_table.filter(filter_target_clause))
self._extract_where(
target_table.filter(filter_target_clause),
config_manager.target_client,
)
)

for i in range(1, first_elements.shape[0] - 1):
Expand All @@ -247,10 +260,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]:
first_elements[i + 1, : len(self.primary_keys)],
)
source_where_list.append(
self._extract_where(source_table.filter(filter_source_clause))
self._extract_where(
source_table.filter(filter_source_clause),
config_manager.source_client,
)
)
target_where_list.append(
self._extract_where(target_table.filter(filter_target_clause))
self._extract_where(
target_table.filter(filter_target_clause),
config_manager.target_client,
)
)
filter_source_clause = geq_value(
source_table,
Expand All @@ -263,10 +282,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]:
first_elements[len(first_elements) - 1, : len(self.primary_keys)],
)
source_where_list.append(
self._extract_where(source_table.filter(filter_source_clause))
self._extract_where(
source_table.filter(filter_source_clause),
config_manager.source_client,
)
)
target_where_list.append(
self._extract_where(target_table.filter(filter_target_clause))
self._extract_where(
target_table.filter(filter_target_clause),
config_manager.target_client,
)
)
master_filter_list.append([source_where_list, target_where_list])
return master_filter_list
Expand Down

0 comments on commit f3cc565

Please sign in to comment.