diff --git a/data_validation/partition_builder.py b/data_validation/partition_builder.py index f16e858f7..19ccfef72 100644 --- a/data_validation/partition_builder.py +++ b/data_validation/partition_builder.py @@ -73,16 +73,23 @@ def partition_configs(self) -> None: self._store_partitions(yaml_configs_list) @staticmethod - def _extract_where(x) -> str: + def _extract_where(table_expr, client) -> str: """Given a ibis table expression with a filter (i.e. WHERE) clause, this function extracts the where clause in plain text Returns: String with the where condition """ - return re.sub(r"\s\s+", " ", ibis.to_sql(x).sql.split("WHERE")[1]).replace( - "t0.", "" - ) + if client.name == "spanner": + # As per Issue #1059, use the BQ dialect for Spanner as they have the same query syntax + return re.sub( + r"\s\s+", + " ", + ibis.to_sql(table_expr, dialect="bigquery").sql.split("WHERE")[1], + ).replace("t0.", "") + return re.sub( + r"\s\s+", " ", ibis.to_sql(table_expr).sql.split("WHERE")[1] + ).replace("t0.", "") def _get_partition_key_filters(self) -> List[List[List[str]]]: """Generate where clauses for each partition for each table pair. We have to create separate where for the source and @@ -221,10 +228,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]: first_elements[1, : len(self.primary_keys)], ) source_where_list.append( - self._extract_where(source_table.filter(filter_source_clause)) + self._extract_where( + source_table.filter(filter_source_clause), + config_manager.source_client, + ) ) target_where_list.append( - self._extract_where(target_table.filter(filter_target_clause)) + self._extract_where( + target_table.filter(filter_target_clause), + config_manager.target_client, + ) ) for i in range(1, first_elements.shape[0] - 1): @@ -247,10 +260,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]: first_elements[i + 1, : len(self.primary_keys)], ) source_where_list.append( - self._extract_where(source_table.filter(filter_source_clause)) + self._extract_where( + source_table.filter(filter_source_clause), + config_manager.source_client, + ) ) target_where_list.append( - self._extract_where(target_table.filter(filter_target_clause)) + self._extract_where( + target_table.filter(filter_target_clause), + config_manager.target_client, + ) ) filter_source_clause = geq_value( source_table, @@ -263,10 +282,16 @@ def _get_partition_key_filters(self) -> List[List[List[str]]]: first_elements[len(first_elements) - 1, : len(self.primary_keys)], ) source_where_list.append( - self._extract_where(source_table.filter(filter_source_clause)) + self._extract_where( + source_table.filter(filter_source_clause), + config_manager.source_client, + ) ) target_where_list.append( - self._extract_where(target_table.filter(filter_target_clause)) + self._extract_where( + target_table.filter(filter_target_clause), + config_manager.target_client, + ) ) master_filter_list.append([source_where_list, target_where_list]) return master_filter_list