From 321b96aaea3545f9da1a526c5e8f2e77d4fff8ed Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Sun, 9 Jun 2024 21:15:21 -0400 Subject: [PATCH] * Add exception mappers to convert storage failures to Iceberg REST client exceptions * Add AccessCheckHandler to ObjectStorageMock for simulating access failures in tests. Closes #8738 --- .../files/api/BackendThrottledException.java | 84 ------------ .../files/api/ObjectIOExceptionMapper.java | 80 +++++++++++ ...ctIOException.java => ObjectIOStatus.java} | 25 ++-- .../catalog/files/api/RetryableException.java | 46 ------- .../api/TestBackendThrottledException.java | 129 ------------------ .../files/s3/S3ClientResourceBench.java | 7 +- .../catalog/files/ResolvingObjectIO.java | 3 +- .../files/adls/AdlsExceptionMapper.java | 34 +++++ .../catalog/files/gcs/GcsExceptionMapper.java | 36 +++++ .../catalog/files/s3/S3ClientSupplier.java | 4 - .../catalog/files/s3/S3Config.java | 8 -- .../catalog/files/s3/S3ExceptionMapper.java | 35 +++++ .../catalog/files/s3/S3ObjectIO.java | 25 +--- .../catalog/files/s3/TestS3Clients.java | 3 +- .../catalog/files/s3/TestS3ObjectIO.java | 76 ----------- .../service/impl/CatalogServiceImpl.java | 23 ++-- .../impl/EntitySnapshotTaskBehavior.java | 29 +++- .../impl/EntitySnapshotTaskRequest.java | 8 +- .../catalog/service/impl/IcebergStuff.java | 19 ++- .../service/impl/ImportSnapshotWorker.java | 62 ++++----- .../catalog/service/impl/Util.java | 41 ------ .../service/impl/AbstractCatalogService.java | 13 +- .../service/impl/TestIcebergStuff.java | 18 ++- .../catalog/service/impl/TestUtil.java | 56 -------- .../service/rest/IcebergErrorMapper.java | 62 ++++++--- .../nessie/cli/commands/WithNessie.java | 3 +- servers/quarkus-common/build.gradle.kts | 1 + .../quarkus/config/CatalogS3Config.java | 5 - .../quarkus/config/CatalogServiceConfig.java | 24 ++++ .../s3/ITS3AssumeRoleIcebergCatalog.java | 2 + .../server/catalog/CatalogProducers.java | 27 ++++ .../catalog/MonitoredTaskServiceMetrics.java | 3 + .../AbstractIcebergCatalogUnitTests.java | 60 ++++++++ ...orageMockTestResourceLifecycleManager.java | 20 +++ .../service/impl/TaskServiceMetrics.java | 2 + .../objectstoragemock/AccessCheckHandler.java | 29 +--- .../objectstoragemock/AdlsGen2Resource.java | 17 ++- .../objectstoragemock/GcsResource.java | 21 +++ .../objectstoragemock/ObjectStorageMock.java | 5 + .../objectstoragemock/S3Resource.java | 28 ++++ 40 files changed, 571 insertions(+), 602 deletions(-) delete mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java create mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java rename catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/{ObjectIOException.java => ObjectIOStatus.java} (60%) delete mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java delete mode 100644 catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java delete mode 100644 catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java rename catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java => testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java (51%) diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java deleted file mode 100644 index 09862bb27b2..00000000000 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; -import static java.time.temporal.ChronoUnit.SECONDS; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.format.DateTimeParseException; -import java.util.Optional; -import java.util.function.Function; - -public class BackendThrottledException extends RetryableException { - public BackendThrottledException(Instant retryNotBefore, Throwable cause) { - super(retryNotBefore, cause); - } - - public BackendThrottledException(Instant retryNotBefore, String message) { - super(retryNotBefore, message); - } - - public BackendThrottledException(Instant retryNotBefore, String message, Throwable cause) { - super(retryNotBefore, message, cause); - } - - /** - * Provides a {@link BackendThrottledException} instance for HTTP responses, for the HTTP status - * codes 429 (too many requests) + 503 (service unavailable), handling HTTP responses having the - * {@code Retry-After} header. - */ - public static Optional fromHttpStatusCode( - int statusCode, - Function headers, - Clock clock, - Function exceptionSupplier, - Duration defaultRetryAfter) { - switch (statusCode) { - case 429: - case 503: - String retryAfter = headers.apply("Retry-After"); - Instant retryNotBefore = null; - Instant now = clock.instant(); - if (retryAfter != null) { - try { - int seconds = Integer.parseInt(retryAfter); - if (seconds > 0) { - retryNotBefore = now.plus(seconds, SECONDS); - } - } catch (NumberFormatException e) { - try { - retryNotBefore = Instant.from(RFC_1123_DATE_TIME.parse(retryAfter)); - if (retryNotBefore.compareTo(now) <= 0) { - retryNotBefore = null; - } - } catch (DateTimeParseException e2) { - // - } - } - } - if (retryNotBefore == null) { - retryNotBefore = now.plus(defaultRetryAfter); - } - return Optional.of(exceptionSupplier.apply(retryNotBefore)); - default: - break; - } - return Optional.empty(); - } -} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java new file mode 100644 index 00000000000..3dceb66d853 --- /dev/null +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java @@ -0,0 +1,80 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.api; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public abstract class ObjectIOExceptionMapper { + + protected abstract List analyzers(); + + protected abstract Duration retryAfterThrottled(); + + protected abstract Duration retryAfterNetworkError(); + + protected abstract Duration reattemptAfterFetchError(); + + protected abstract Clock clock(); + + public static ImmutableObjectIOExceptionMapper.Builder builder() { + return ImmutableObjectIOExceptionMapper.builder(); + } + + public Optional analyze(Throwable ex) { + for (Throwable th = ex; th != null; th = th.getCause()) { + for (Analyzer analyzer : analyzers()) { + OptionalInt statusCode = analyzer.httpStatusCode(th); + if (statusCode.isPresent()) { + return Optional.of(toStatus(statusCode.getAsInt(), th)); + } + } + } + + return Optional.empty(); + } + + private Instant retryAfter(Duration delay) { + return clock().instant().plus(delay); + } + + private ObjectIOStatus toStatus(int httpStatus, Throwable th) { + switch (httpStatus) { + case 408: + case 425: + case 429: + return ObjectIOStatus.of(httpStatus, true, retryAfter(retryAfterThrottled()), th); + + case 502: + case 503: + case 504: + return ObjectIOStatus.of(httpStatus, true, retryAfter(retryAfterNetworkError()), th); + + default: + return ObjectIOStatus.of(httpStatus, false, retryAfter(reattemptAfterFetchError()), th); + } + } + + public interface Analyzer { + OptionalInt httpStatusCode(Throwable th); + } +} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java similarity index 60% rename from catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java rename to catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java index 45e0d58c30d..c9b78d9ebea 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java @@ -15,24 +15,21 @@ */ package org.projectnessie.catalog.files.api; -import java.io.IOException; import java.time.Instant; -import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; -public abstract class ObjectIOException extends IOException { - public ObjectIOException(Throwable cause) { - super(cause); - } +@NessieImmutable +public interface ObjectIOStatus { + int httpStatusCode(); - public ObjectIOException(String message) { - super(message); - } + boolean isRetryable(); - public ObjectIOException(String message, Throwable cause) { - super(message, cause); - } + Instant reattemptAfter(); - public abstract boolean isRetryable(); + Throwable cause(); - public abstract Optional retryNotBefore(); + static ObjectIOStatus of( + int httpCode, boolean retryable, Instant reattemptAfter, Throwable cause) { + return ImmutableObjectIOStatus.of(httpCode, retryable, reattemptAfter, cause); + } } diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java deleted file mode 100644 index 4d8e4a693b0..00000000000 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import java.time.Instant; -import java.util.Optional; - -public class RetryableException extends ObjectIOException { - private final Instant retryNotBefore; - - public RetryableException(Instant retryNotBefore, Throwable cause) { - super(cause); - this.retryNotBefore = retryNotBefore; - } - - public RetryableException(Instant retryNotBefore, String message) { - super(message); - this.retryNotBefore = retryNotBefore; - } - - public RetryableException(Instant retryNotBefore, String message, Throwable cause) { - super(message, cause); - this.retryNotBefore = retryNotBefore; - } - - public boolean isRetryable() { - return true; - } - - public Optional retryNotBefore() { - return Optional.of(retryNotBefore); - } -} diff --git a/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java b/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java deleted file mode 100644 index 8f3e8cc1a5c..00000000000 --- a/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import static java.time.Clock.systemUTC; -import static java.time.Instant.now; -import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; -import static java.time.temporal.ChronoUnit.DAYS; -import static java.time.temporal.ChronoUnit.SECONDS; -import static org.assertj.core.api.InstanceOfAssertFactories.optional; -import static org.junit.jupiter.params.provider.Arguments.arguments; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; -import java.util.stream.Stream; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -@ExtendWith(SoftAssertionsExtension.class) -public class TestBackendThrottledException { - @InjectSoftAssertions protected SoftAssertions soft; - - @Test - public void nonRetryableHttp() { - for (int i = 0; i <= 999; i++) { - // HTTP 429 and 503 - if (i == 429 || i == 503) { - continue; - } - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - i, - header -> null, - systemUTC(), - instant -> new BackendThrottledException(instant, "hello"), - Duration.of(10, SECONDS))) - .isEmpty(); - } - } - - static Instant now = now().truncatedTo(SECONDS); - - @ParameterizedTest - @MethodSource - public void retryableDefaults(String retryAfterHeader) { - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration fallback = Duration.of(10, SECONDS); - Instant expected = clock.instant().plus(fallback); - - for (int statusCode : new int[] {429, 503}) { - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - statusCode, - header -> retryAfterHeader, - clock, - instant -> new BackendThrottledException(instant, "hello"), - fallback)) - .get() - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(expected); - } - } - - static Stream retryableDefaults() { - return Stream.of( - null, - "dummy", - "0", - "-1", - "" + Integer.MIN_VALUE, - formatInstant(now), - formatInstant(now.minus(1, SECONDS))); - } - - static String formatInstant(Instant instant) { - return RFC_1123_DATE_TIME.format(instant.atZone(ZoneId.of("UTC"))); - } - - @ParameterizedTest - @MethodSource - public void retryableWithHeader(String retryAfterHeader, Instant expected) { - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration fallback = Duration.of(10, SECONDS); - - for (int statusCode : new int[] {429, 503}) { - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - statusCode, - header -> retryAfterHeader, - clock, - instant -> new BackendThrottledException(instant, "hello"), - fallback)) - .get() - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(expected); - } - } - - static Stream retryableWithHeader() { - return Stream.of( - arguments("42", now.plus(42, SECONDS)), - arguments(formatInstant(now.plus(42, SECONDS)), now.plus(42, SECONDS)), - arguments("86400", now.plus(86400, SECONDS)), - arguments(formatInstant(now.plus(123, DAYS)), now.plus(123, DAYS))); - } -} diff --git a/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java b/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java index b94e824ec83..918238681a7 100644 --- a/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java +++ b/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; -import java.time.Clock; import java.util.Map; import java.util.stream.Collectors; import org.openjdk.jmh.annotations.Benchmark; @@ -105,7 +104,7 @@ public void s3client(BenchmarkParam param, Blackhole bh) { @Benchmark public void s3Get(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/key"))) { bh.consume(in.readAllBytes()); } @@ -113,7 +112,7 @@ public void s3Get(BenchmarkParam param, Blackhole bh) throws IOException { @Benchmark public void s3Get250k(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/s-256000"))) { bh.consume(in.readAllBytes()); } @@ -121,7 +120,7 @@ public void s3Get250k(BenchmarkParam param, Blackhole bh) throws IOException { @Benchmark public void s3Get4M(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/s-4194304"))) { bh.consume(in.readAllBytes()); } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java index d8a6ba86f70..260002539e1 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java @@ -16,7 +16,6 @@ package org.projectnessie.catalog.files; import java.io.IOException; -import java.time.Clock; import java.util.ArrayList; import java.util.IdentityHashMap; import java.util.List; @@ -40,7 +39,7 @@ public ResolvingObjectIO( AdlsClientSupplier adlsClientSupplier, GcsStorageSupplier gcsStorageSupplier) { this( - new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()), + new S3ObjectIO(s3ClientSupplier), new GcsObjectIO(gcsStorageSupplier), new AdlsObjectIO(adlsClientSupplier)); } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java new file mode 100644 index 00000000000..3d7444c9848 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.adls; + +import com.azure.storage.blob.models.BlobStorageException; +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; + +public class AdlsExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + public static final AdlsExceptionMapper INSTANCE = new AdlsExceptionMapper(); + + private AdlsExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof BlobStorageException) { + return OptionalInt.of(((BlobStorageException) th).getStatusCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java new file mode 100644 index 00000000000..6cbf74617a6 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.gcs; + +import com.google.cloud.storage.StorageException; +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; + +/** Extracts storage-side HTTP status code from GCS client exceptions. */ +public final class GcsExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + + public static final GcsExceptionMapper INSTANCE = new GcsExceptionMapper(); + + private GcsExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof StorageException) { + return OptionalInt.of(((StorageException) th).getCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java index af295993467..a0acefd8a2d 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java @@ -62,10 +62,6 @@ public S3ClientSupplier( this.sessions = sessions; } - public S3Config s3config() { - return s3config; - } - public S3Options s3options() { return s3options; } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java index cd491677dd2..779e3c61161 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java @@ -123,11 +123,6 @@ public interface S3Config { @ConfigItem(section = "transport") Optional keyStorePassword(); - /** - * Interval after which a request is retried when S3 response with some "retry later" response. - */ - Optional retryAfter(); - static Builder builder() { return ImmutableS3Config.builder(); } @@ -176,9 +171,6 @@ interface Builder { @CanIgnoreReturnValue Builder keyStorePassword(KeySecret keyStorePassword); - @CanIgnoreReturnValue - Builder retryAfter(Duration retryAfter); - S3Config build(); } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java new file mode 100644 index 00000000000..35b0b94e485 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.s3; + +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import software.amazon.awssdk.core.exception.SdkServiceException; + +public class S3ExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + + public static final S3ExceptionMapper INSTANCE = new S3ExceptionMapper(); + + private S3ExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof SdkServiceException) { + return OptionalInt.of(((SdkServiceException) th).statusCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java index 20c73a0c278..5e3f5f0752e 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java @@ -16,26 +16,18 @@ package org.projectnessie.catalog.files.s3; import static com.google.common.base.Preconditions.checkArgument; -import static java.time.temporal.ChronoUnit.SECONDS; import static org.projectnessie.catalog.files.s3.S3Utils.isS3scheme; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.time.Clock; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.catalog.files.api.NonRetryableException; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.storage.uri.StorageUri; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import software.amazon.awssdk.core.exception.SdkServiceException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; @@ -45,14 +37,11 @@ import software.amazon.awssdk.services.s3.model.PutObjectRequest; public class S3ObjectIO implements ObjectIO { - private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectIO.class); private final S3ClientSupplier s3clientSupplier; - private final Clock clock; - public S3ObjectIO(S3ClientSupplier s3clientSupplier, Clock clock) { + public S3ObjectIO(S3ClientSupplier s3clientSupplier) { this.s3clientSupplier = s3clientSupplier; - this.clock = clock; } @Override @@ -79,16 +68,8 @@ public InputStream readObject(StorageUri uri) throws IOException { .bucket(uri.requiredAuthority()) .key(withoutLeadingSlash(uri)) .build()); - } catch (SdkServiceException e) { - if (e.isThrottlingException()) { - throw new BackendThrottledException( - clock - .instant() - .plus(s3clientSupplier.s3config().retryAfter().orElse(Duration.of(10, SECONDS))), - "S3 throttled", - e); - } - throw new NonRetryableException(e); + } catch (Exception e) { + throw new IOException(e); } } diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java index 8db6839004a..ff936e5e247 100644 --- a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java +++ b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java @@ -25,7 +25,6 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.security.KeyStore; -import java.time.Clock; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; @@ -90,7 +89,7 @@ protected ObjectIO buildObjectIO( names.stream() .collect(Collectors.toMap(identity(), k -> Map.of("secret", "secret")))), null); - return new S3ObjectIO(supplier, Clock.systemUTC()); + return new S3ObjectIO(supplier); } @Override diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java deleted file mode 100644 index 00852298754..00000000000 --- a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.s3; - -import static java.time.temporal.ChronoUnit.SECONDS; -import static org.assertj.core.api.InstanceOfAssertFactories.optional; -import static org.assertj.core.api.InstanceOfAssertFactories.type; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.storage.uri.StorageUri; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3Utilities; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; - -@ExtendWith(SoftAssertionsExtension.class) -public class TestS3ObjectIO { - @InjectSoftAssertions protected SoftAssertions soft; - - @SuppressWarnings("resource") - @Test - public void readObjectThrottledThrowsBackendThrottledException() { - S3Client s3client = mock(S3Client.class); - - Instant now = Instant.now(); - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration defaultRetryAfter = Duration.of(10, SECONDS); - StorageUri location = StorageUri.of("s3://hello/foo/bar"); - - when(s3client.utilities()) - .thenReturn(S3Utilities.builder().region(Region.EU_CENTRAL_1).build()); - - when(s3client.getObject(any(GetObjectRequest.class))) - .thenThrow(S3Exception.builder().statusCode(429).message("blah").build()); - - S3ClientSupplier s3ClientSupplier = mock(S3ClientSupplier.class); - when(s3ClientSupplier.getClient(location)).thenReturn(s3client); - when(s3ClientSupplier.s3config()) - .thenReturn(S3Config.builder().retryAfter(defaultRetryAfter).build()); - - S3ObjectIO objectIO = new S3ObjectIO(s3ClientSupplier, clock); - - soft.assertThatThrownBy(() -> objectIO.readObject(location)) - .isInstanceOf(BackendThrottledException.class) - .asInstanceOf(type(BackendThrottledException.class)) - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(now.plus(defaultRetryAfter)); - } -} diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index cd94a2ba9d6..4080314507f 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -43,7 +43,6 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import jakarta.inject.Named; -import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; @@ -117,12 +116,17 @@ public class CatalogServiceImpl implements CatalogService { @Inject NessieApiV2 nessieApi; @Inject Persist persist; @Inject TasksService tasksService; + @Inject EntitySnapshotTaskBehavior snapshotTaskBehavior; @Inject CatalogConfig catalogConfig; @Inject @Named("import-jobs") Executor executor; + private IcebergStuff icebergStuff() { + return new IcebergStuff(objectIO, persist, tasksService, snapshotTaskBehavior, executor); + } + @Override public Stream>> retrieveSnapshots( SnapshotReqParams reqParams, @@ -145,7 +149,7 @@ public Stream>> retrieveSnapshots( .keys(keys) .getWithResponse(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = icebergStuff(); Reference effectiveReference = contentResponse.getEffectiveReference(); effectiveReferenceConsumer.accept(effectiveReference); @@ -213,8 +217,7 @@ public CompletionStage retrieveSnapshot( ObjId snapshotId = snapshotIdFromContent(content); CompletionStage> snapshotStage = - new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + icebergStuff().retrieveIcebergSnapshot(snapshotId, content); return snapshotStage.thenApply( snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); @@ -356,7 +359,7 @@ CompletionStage commit(ParsedReference reference, CatalogCommi Map contents = contentsResponse.toContentsMap(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = icebergStuff(); CommitMultipleOperationsBuilder nessieCommit = nessieApi.commitMultipleOperations().branch(target); @@ -659,14 +662,12 @@ private List pruneUpdates(IcebergCatalogOperation op, boo private CompletionStage loadExistingTableSnapshot(Content content) { ObjId snapshotId = snapshotIdFromContent(content); - return new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } private CompletionStage loadExistingViewSnapshot(Content content) { ObjId snapshotId = snapshotIdFromContent(content); - return new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } private IcebergTableMetadata storeTableSnapshot( @@ -690,8 +691,8 @@ private M storeSnapshot( multiTableUpdate.addStoredLocation(metadataJsonLocation); try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { IcebergJson.objectMapper().writeValue(out, metadata); - } catch (IOException ex) { - throw new RuntimeException(ex); + } catch (Exception ex) { + throw new RuntimeException("Failed to write snapshot to: " + metadataJsonLocation, ex); } return metadata; } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java index 5c7ee2fd699..2b7222008fb 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java @@ -17,21 +17,26 @@ import static java.time.temporal.ChronoUnit.MINUTES; import static java.time.temporal.ChronoUnit.SECONDS; -import static org.projectnessie.catalog.service.impl.Util.throwableAsErrorTaskState; +import static org.projectnessie.nessie.tasks.api.TaskState.failureState; +import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.nessie.tasks.api.TaskState.runningState; import java.time.Clock; import java.time.Instant; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj; import org.projectnessie.nessie.tasks.api.TaskBehavior; import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjType; -final class EntitySnapshotTaskBehavior +public final class EntitySnapshotTaskBehavior implements TaskBehavior { - static final EntitySnapshotTaskBehavior INSTANCE = new EntitySnapshotTaskBehavior(); + private final ObjectIOExceptionMapper exceptionMapper; - private EntitySnapshotTaskBehavior() {} + public EntitySnapshotTaskBehavior(ObjectIOExceptionMapper exceptionMapper) { + this.exceptionMapper = exceptionMapper; + } @Override public Throwable stateAsException(EntitySnapshotObj obj) { @@ -43,6 +48,10 @@ public Instant performRunningStateUpdateAt(Clock clock, EntitySnapshotObj runnin return clock.instant().plus(2, SECONDS); } + private TaskState taskState(@Nullable EntitySnapshotObj obj) { + return obj == null ? null : obj.taskState(); + } + @Override public TaskState runningTaskState(Clock clock, EntitySnapshotObj running) { Instant now = clock.instant(); @@ -63,6 +72,16 @@ public ObjType objType() { @Override public TaskState asErrorTaskState(Clock clock, EntitySnapshotObj base, Throwable t) { - return throwableAsErrorTaskState(t); + return exceptionMapper + .analyze(t) + .map( + status -> { + if (status.isRetryable()) { + return retryableErrorState(status.reattemptAfter(), t.toString()); + } else { + return failureState(t.toString()); + } + }) + .orElseGet(() -> failureState(t.toString())); } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java index dfed6d2e08b..cf32f27d4fd 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java @@ -43,10 +43,7 @@ default ObjType objType() { } @Override - @Value.NonAttribute - default TaskBehavior behavior() { - return EntitySnapshotTaskBehavior.INSTANCE; - } + TaskBehavior behavior(); @Override ObjId objId(); @@ -92,10 +89,11 @@ static EntitySnapshotTaskRequest entitySnapshotTaskRequest( ObjId objId, Content content, NessieEntitySnapshot snapshot, + EntitySnapshotTaskBehavior behavior, Persist persist, ObjectIO objectIO, Executor executor) { return ImmutableEntitySnapshotTaskRequest.of( - objId, content, snapshot, persist, objectIO, executor); + behavior, objId, content, snapshot, persist, objectIO, executor); } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java index 0854a4a45e4..5b359aa245e 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java @@ -43,13 +43,19 @@ public class IcebergStuff { private final ObjectIO objectIO; private final Persist persist; private final TasksService tasksService; + private final EntitySnapshotTaskBehavior snapshotTaskBehavior; private final Executor executor; public IcebergStuff( - ObjectIO objectIO, Persist persist, TasksService tasksService, Executor executor) { + ObjectIO objectIO, + Persist persist, + TasksService tasksService, + EntitySnapshotTaskBehavior snapshotTaskBehavior, + Executor executor) { this.objectIO = objectIO; this.persist = persist; this.tasksService = tasksService; + this.snapshotTaskBehavior = snapshotTaskBehavior; this.executor = executor; } @@ -61,7 +67,8 @@ public IcebergStuff( public > CompletionStage retrieveIcebergSnapshot( ObjId snapshotId, Content content) { EntitySnapshotTaskRequest snapshotTaskRequest = - entitySnapshotTaskRequest(snapshotId, content, null, persist, objectIO, executor); + entitySnapshotTaskRequest( + snapshotId, content, null, snapshotTaskBehavior, persist, objectIO, executor); return triggerIcebergSnapshot(snapshotTaskRequest); } @@ -92,7 +99,13 @@ public > CompletionStage storeSnapshot( S snapshot, Content content) { EntitySnapshotTaskRequest snapshotTaskRequest = entitySnapshotTaskRequest( - nessieIdToObjId(snapshot.id()), content, snapshot, persist, objectIO, executor); + nessieIdToObjId(snapshot.id()), + content, + snapshot, + snapshotTaskBehavior, + persist, + objectIO, + executor); return triggerIcebergSnapshot(snapshotTaskRequest); } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java index de0468b38eb..9183fc48c96 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java @@ -24,7 +24,6 @@ import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; import static org.projectnessie.versioned.storage.common.persist.ObjIdHasher.objIdHasher; -import java.io.IOException; import java.io.InputStream; import java.time.Instant; import java.util.UUID; @@ -66,31 +65,18 @@ final class ImportSnapshotWorker { EntitySnapshotObj.Builder importSnapshot() { Content content = taskRequest.content(); if (content instanceof IcebergTable) { - try { - return importIcebergTable( - (IcebergTable) content, (NessieTableSnapshot) taskRequest.snapshot()); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } + return importIcebergTable( + (IcebergTable) content, (NessieTableSnapshot) taskRequest.snapshot()); } if (content instanceof IcebergView) { - try { - return importIcebergView( - (IcebergView) content, (NessieViewSnapshot) taskRequest.snapshot()); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } + return importIcebergView((IcebergView) content, (NessieViewSnapshot) taskRequest.snapshot()); } throw new UnsupportedOperationException("Unsupported Nessie content type " + content.getType()); } private EntitySnapshotObj.Builder importIcebergTable( - IcebergTable content, NessieTableSnapshot snapshot) throws Exception { + IcebergTable content, NessieTableSnapshot snapshot) { NessieId snapshotId = objIdToNessieId(taskRequest.objId()); LOGGER.debug( @@ -107,8 +93,9 @@ private EntitySnapshotObj.Builder importIcebergTable( // snapshot!=null means that we already have the snapshot (via a committing operation - like a // table update) and do not need to import it but can just store it. if (snapshot == null) { - IcebergTableMetadata tableMetadata; StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation()); + NessieTable table; + IcebergTableMetadata tableMetadata; try { InputStream input = taskRequest.objectIO().readObject(metadataLocation); if (metadataLocation.requiredPath().endsWith(".gz") @@ -116,13 +103,12 @@ private EntitySnapshotObj.Builder importIcebergTable( input = new GZIPInputStream(input); } tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class); - } catch (IOException e) { - throw new IOException( + table = entityObjForContent(content, tableMetadata, entityObjId); + } catch (Exception e) { + throw new RuntimeException( "Failed to read table metadata from " + content.getMetadataLocation(), e); } - NessieTable table = entityObjForContent(content, tableMetadata, entityObjId); - snapshot = icebergTableSnapshotToNessie( snapshotId, @@ -176,7 +162,7 @@ private EntitySnapshotObj.Builder importIcebergTable( } private EntitySnapshotObj.Builder importIcebergView( - IcebergView content, NessieViewSnapshot snapshot) throws Exception { + IcebergView content, NessieViewSnapshot snapshot) { NessieId snapshotId = objIdToNessieId(taskRequest.objId()); LOGGER.debug( @@ -193,6 +179,7 @@ private EntitySnapshotObj.Builder importIcebergView( // snapshot!=null means that we already have the snapshot (via a committing operation - like a // table update) and do not need to import it but can just store it. if (snapshot == null) { + NessieView view; IcebergViewMetadata viewMetadata; StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation()); try { @@ -202,23 +189,22 @@ private EntitySnapshotObj.Builder importIcebergView( input = new GZIPInputStream(input); } viewMetadata = IcebergJson.objectMapper().readValue(input, IcebergViewMetadata.class); - } catch (IOException e) { - throw new IOException( + view = + entityObjForContent( + content, + viewMetadata, + entityObjId, + () -> + viewMetadata.versions().stream() + .filter(v -> v.versionId() == viewMetadata.currentVersionId()) + .findFirst() + .orElseThrow() + .timestampMs()); + } catch (Exception e) { + throw new RuntimeException( "Failed to read view metadata from " + content.getMetadataLocation(), e); } - NessieView view = - entityObjForContent( - content, - viewMetadata, - entityObjId, - () -> - viewMetadata.versions().stream() - .filter(v -> v.versionId() == viewMetadata.currentVersionId()) - .findFirst() - .orElseThrow() - .timestampMs()); - snapshot = icebergViewSnapshotToNessie(snapshotId, null, view, viewMetadata); } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java index 2583414aecc..aa6dbefc48d 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java @@ -18,17 +18,11 @@ import static org.projectnessie.catalog.model.id.NessieId.emptyNessieId; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromByteAccessor; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromLongs; -import static org.projectnessie.nessie.tasks.api.TaskState.failureState; -import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteAccessor; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromLongs; import static org.projectnessie.versioned.storage.common.persist.ObjId.zeroLengthObjId; -import java.util.Optional; -import java.util.function.Function; -import org.projectnessie.catalog.files.api.ObjectIOException; import org.projectnessie.catalog.model.id.NessieId; -import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjId; final class Util { @@ -55,39 +49,4 @@ static NessieId objIdToNessieId(ObjId id) { return nessieIdFromByteAccessor(id.size(), id::byteAt); } } - - static TaskState throwableAsErrorTaskState(Throwable throwable) { - return anyCauseMatches( - throwable, - e -> { - if (e instanceof ObjectIOException) { - return ((ObjectIOException) e) - .retryNotBefore() - .map(notBefore -> retryableErrorState(notBefore, e.toString())) - .orElse(null); - } - return null; - }) - .orElseGet(() -> failureState(throwable.toString())); - } - - static Optional anyCauseMatches( - Throwable throwable, Function mappingPredicate) { - if (throwable == null) { - return Optional.empty(); - } - for (Throwable e = throwable; e != null; e = e.getCause()) { - R r = mappingPredicate.apply(e); - if (r != null) { - return Optional.of(r); - } - for (Throwable sup : e.getSuppressed()) { - r = mappingPredicate.apply(sup); - if (r != null) { - return Optional.of(r); - } - } - } - return Optional.empty(); - } } diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java index cf0c4d74212..800521238b9 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/AbstractCatalogService.java @@ -32,6 +32,7 @@ import static org.projectnessie.services.authz.AbstractBatchAccessChecker.NOOP_ACCESS_CHECKER; import java.time.Clock; +import java.time.Duration; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -49,6 +50,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.projectnessie.api.v2.params.ParsedReference; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.files.s3.S3BucketOptions; import org.projectnessie.catalog.files.s3.S3ClientSupplier; import org.projectnessie.catalog.files.s3.S3Clients; @@ -186,6 +188,15 @@ private void setupCatalogService() { catalogService.persist = persist; catalogService.executor = executor; catalogService.nessieApi = api; + + ObjectIOExceptionMapper exceptionMapper = + ObjectIOExceptionMapper.builder() + .clock(Clock.systemUTC()) + .retryAfterThrottled(Duration.ofMillis(1)) + .retryAfterNetworkError(Duration.ofMillis(1)) + .reattemptAfterFetchError(Duration.ofMillis(1)) + .build(); + catalogService.snapshotTaskBehavior = new EntitySnapshotTaskBehavior(exceptionMapper); } private void setupObjectIO() { @@ -212,7 +223,7 @@ private void setupObjectIO() { names.stream() .collect(Collectors.toMap(k -> k, k -> Map.of("secret", "secret")))), sessions); - objectIO = new S3ObjectIO(clientSupplier, Clock.systemUTC()); + objectIO = new S3ObjectIO(clientSupplier); } private void setupObjectStorage() { diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java index 364a42060b0..6b644efd286 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.time.Clock; +import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +44,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.files.local.LocalObjectIO; import org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; @@ -106,8 +108,22 @@ public void retryImportManifests() { @MethodSource("icebergTableImports") public void icebergTableImports( @SuppressWarnings("unused") String testName, String icebergTableMetadata) throws Exception { + ObjectIOExceptionMapper exceptionMapper = + ObjectIOExceptionMapper.builder() + .clock(Clock.systemUTC()) + .retryAfterThrottled(Duration.ofMillis(1)) + .retryAfterNetworkError(Duration.ofMillis(1)) + .reattemptAfterFetchError(Duration.ofMillis(1)) + .build(); + ObjectIO objectIO = new LocalObjectIO(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = + new IcebergStuff( + objectIO, + persist, + tasksService, + new EntitySnapshotTaskBehavior(exceptionMapper), + executor); ObjId snapshotId = randomObjId(); IcebergTable icebergTable = diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java index 55f251274f5..7e4e8da3f65 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java @@ -15,18 +15,13 @@ */ package org.projectnessie.catalog.service.impl; -import static java.time.temporal.ChronoUnit.MINUTES; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.catalog.model.id.NessieId.emptyNessieId; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromBytes; -import static org.projectnessie.nessie.tasks.api.TaskState.failureState; -import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; import static org.projectnessie.versioned.storage.common.persist.ObjId.zeroLengthObjId; -import java.time.Instant; -import java.util.Optional; import java.util.stream.IntStream; import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; @@ -36,10 +31,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.catalog.files.api.ObjectIOException; import org.projectnessie.catalog.model.id.NessieId; -import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjId; @ExtendWith(SoftAssertionsExtension.class) @@ -144,52 +136,4 @@ static Stream nessieIdObjId() { .mapToObj(x -> randomObjId()) .map(objId -> arguments(nessieIdFromBytes(objId.asByteArray()), objId))); } - - @ParameterizedTest - @MethodSource - public void anyCauseMatches(Throwable throwable, Instant expected) { - soft.assertThat( - Util.anyCauseMatches( - throwable, - t -> - t instanceof ObjectIOException - ? ((ObjectIOException) t).retryNotBefore().orElse(null) - : null)) - .isEqualTo(Optional.ofNullable(expected)); - } - - @ParameterizedTest - @MethodSource("anyCauseMatches") - public void throwableAsErrorTaskState(Throwable throwable, Instant expected) { - TaskState taskState = Util.throwableAsErrorTaskState(throwable); - - soft.assertThat(taskState) - .isEqualTo( - expected != null - ? retryableErrorState(expected, BackendThrottledException.class.getName() + ": foo") - : failureState(throwable.toString())); - } - - static Stream anyCauseMatches() { - Instant retryNotBefore = Instant.now().plus(42, MINUTES); - BackendThrottledException throttled = new BackendThrottledException(retryNotBefore, "foo"); - - Exception sup1 = new Exception(); - sup1.addSuppressed(throttled); - - Exception sup2 = new Exception(); - sup2.addSuppressed(new RuntimeException()); - sup2.addSuppressed(throttled); - - return Stream.of( - arguments(throttled, retryNotBefore), - arguments(new Exception(throttled), retryNotBefore), - arguments(new Exception(new Exception(throttled)), retryNotBefore), - arguments(new Exception(), null), - arguments(new Exception(new Exception()), null), - arguments(sup1, retryNotBefore), - arguments(sup2, retryNotBefore), - arguments(new Exception(sup1), retryNotBefore), - arguments(new Exception(sup2), retryNotBefore)); - } } diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java index 51140966f13..9b6c3cdd00f 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java @@ -28,8 +28,10 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletionException; +import java.util.Optional; import java.util.stream.Collectors; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import org.projectnessie.catalog.files.api.ObjectIOStatus; import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse; import org.projectnessie.catalog.formats.iceberg.rest.IcebergException; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; @@ -55,29 +57,34 @@ public class IcebergErrorMapper { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergErrorMapper.class); private final ExceptionConfig exceptionConfig; + private final ObjectIOExceptionMapper ioExceptionMapper; @Inject - public IcebergErrorMapper(ExceptionConfig exceptionConfig) { + public IcebergErrorMapper( + ExceptionConfig exceptionConfig, ObjectIOExceptionMapper ioExceptionMapper) { this.exceptionConfig = exceptionConfig; + this.ioExceptionMapper = ioExceptionMapper; } public Response toResponse(Throwable ex, IcebergEntityKind kind) { - if (ex instanceof CompletionException) { - ex = ex.getCause(); - } - - if (ex.getClass() == RuntimeException.class && ex.getCause() != null) { - ex = ex.getCause(); + IcebergErrorResponse body = null; + for (Throwable th = ex; th != null; th = th.getCause()) { + if (th instanceof BaseNessieClientServerException) { + BaseNessieClientServerException e = (BaseNessieClientServerException) th; + body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind); + break; + } else if (th instanceof IllegalArgumentException) { + body = errorResponse(400, "IllegalArgumentException", th.getMessage(), th); + break; + } else if (th instanceof IcebergException) { + body = ((IcebergException) th).toErrorResponse(); + break; + } } - IcebergErrorResponse body = null; - if (ex instanceof BaseNessieClientServerException) { - BaseNessieClientServerException e = (BaseNessieClientServerException) ex; - body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind); - } else if (ex instanceof IllegalArgumentException) { - body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex); - } else if (ex instanceof IcebergException) { - body = ((IcebergException) ex).toErrorResponse(); + Optional status = ioExceptionMapper.analyze(ex); + if (status.isPresent()) { + body = mapStorageFailure(status.get(), ex); } if (body == null) { @@ -89,6 +96,29 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) { return Response.status(code == null ? 500 : code).entity(body).build(); } + private static String message(ObjectIOStatus status, Throwable ex) { + return String.format("%s (due to: %s)", ex.getMessage(), status.cause().toString()); + } + + private IcebergErrorResponse mapStorageFailure(ObjectIOStatus status, Throwable ex) { + // Log full stack trace on the server side for troubleshooting + LOGGER.info("Propagating storage failure to client: {}", ex, ex); + + int httpStatusCode = status.httpStatusCode(); + switch (httpStatusCode) { + case 401: + return errorResponse(httpStatusCode, "NotAuthorizedException", message(status, ex), ex); + case 403: + return errorResponse(httpStatusCode, "ForbiddenException", message(status, ex), ex); + case 404: + // Convert storage-side "not found" into `IllegalArgumentException`. + // In most cases this results from bad locations in Iceberg metadata. + return errorResponse(400, "IllegalArgumentException", message(status, ex), ex); + default: + return null; + } + } + private IcebergErrorResponse mapNessieError( Exception ex, ErrorCode err, NessieErrorDetails errorDetails, IcebergEntityKind kind) { switch (err) { diff --git a/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java b/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java index cf6adb00e4b..6821a47dc22 100644 --- a/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java +++ b/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -134,7 +133,7 @@ protected static void setupObjectStoreAndNessie( .collect(Collectors.toMap(k -> k, k -> Map.of("secret", "secret")))), sessions); - ObjectIO objectIO = new S3ObjectIO(clientSupplier, Clock.systemUTC()); + ObjectIO objectIO = new S3ObjectIO(clientSupplier); tableOneMetadataLocation = generateMetadataWithManifestList( diff --git a/servers/quarkus-common/build.gradle.kts b/servers/quarkus-common/build.gradle.kts index 3a7ff79dffb..631fe49b78a 100644 --- a/servers/quarkus-common/build.gradle.kts +++ b/servers/quarkus-common/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { implementation(project(":nessie-services")) implementation(project(":nessie-services-config")) implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-catalog-files-api")) implementation(project(":nessie-catalog-files-impl")) implementation(project(":nessie-catalog-service-common")) implementation(project(":nessie-catalog-secrets-api")) diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java index 6ddfd0604fe..3b4ccb6ae6d 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java @@ -16,7 +16,6 @@ package org.projectnessie.quarkus.config; import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; import java.nio.file.Path; import java.time.Duration; @@ -39,10 +38,6 @@ */ @ConfigMapping(prefix = "nessie.catalog.service.s3") public interface CatalogS3Config extends S3Config, S3Options { - @WithName("throttled-retry-after") - @WithDefault("PT10S") - @Override - Optional retryAfter(); @WithName("http.expect-continue-enabled") @Override diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java index b91c0171e8f..f4ae6dd9182 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java @@ -22,6 +22,30 @@ @ConfigMapping(prefix = "nessie.catalog.service") public interface CatalogServiceConfig { + /** + * Interval after which a request is retried when storage I/O responds with some "retry later" + * response. + */ + @WithName("throttled-retry-after") // TODO: update helm helpers + @WithDefault("PT10S") + Duration retryAfterThrottled(); + + /** + * Interval after which a request is retried in case of networks / routing errors (e.g. "Service + * Unavailable"). + */ + @WithName("network-error-retry-after") + @WithDefault("PT30S") + Duration retryAfterNetworkError(); + + /** + * Interval after which new requests for data that previously failed to be retrieved from storage + * can be re-attempted. + */ + @WithName("reattempt-after-fetch-error") + @WithDefault("PT60S") + Duration reattemptAfterFetchError(); + /** Advanced property, defines the maximum number of concurrent imports from object stores. */ @WithName("imports.max-concurrent") @WithDefault("32") diff --git a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java index db512c910fe..b1a1dadba38 100644 --- a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java +++ b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java @@ -27,6 +27,7 @@ import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterAll; @@ -97,6 +98,7 @@ void testCreateTableForbidden() { Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); // Attempts to create files blocked by the session IAM policy break the createTable() call assertThatThrownBy(() -> catalog.createTable(TableIdentifier.of(ns, "table1"), schema)) + .isInstanceOf(ForbiddenException.class) .hasMessageContaining("S3Exception: Access Denied") // make sure the error comes from the Catalog Server .hasStackTraceContaining("org.apache.iceberg.rest.RESTClient"); diff --git a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java index 801593209ce..182d23d32ec 100644 --- a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java +++ b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java @@ -43,24 +43,29 @@ import org.projectnessie.catalog.files.ResolvingObjectIO; import org.projectnessie.catalog.files.adls.AdlsClientSupplier; import org.projectnessie.catalog.files.adls.AdlsClients; +import org.projectnessie.catalog.files.adls.AdlsExceptionMapper; import org.projectnessie.catalog.files.adls.AdlsFileSystemOptions; import org.projectnessie.catalog.files.adls.AdlsOptions; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.files.api.RequestSigner; import org.projectnessie.catalog.files.gcs.GcsBucketOptions; import org.projectnessie.catalog.files.gcs.GcsClients; +import org.projectnessie.catalog.files.gcs.GcsExceptionMapper; import org.projectnessie.catalog.files.gcs.GcsOptions; import org.projectnessie.catalog.files.gcs.GcsStorageSupplier; import org.projectnessie.catalog.files.s3.S3BucketOptions; import org.projectnessie.catalog.files.s3.S3ClientSupplier; import org.projectnessie.catalog.files.s3.S3Clients; import org.projectnessie.catalog.files.s3.S3CredentialsResolver; +import org.projectnessie.catalog.files.s3.S3ExceptionMapper; import org.projectnessie.catalog.files.s3.S3Options; import org.projectnessie.catalog.files.s3.S3Sessions; import org.projectnessie.catalog.files.s3.S3SessionsManager; import org.projectnessie.catalog.files.s3.S3Signer; import org.projectnessie.catalog.secrets.SecretsProvider; import org.projectnessie.catalog.service.config.CatalogConfig; +import org.projectnessie.catalog.service.impl.EntitySnapshotTaskBehavior; import org.projectnessie.client.api.NessieApiV2; import org.projectnessie.nessie.combined.CombinedClientBuilder; import org.projectnessie.nessie.tasks.async.TasksAsync; @@ -174,6 +179,7 @@ public HttpClient adlsHttpClient(CatalogAdlsConfig adlsConfig) { @Produces @Singleton public ObjectIO objectIO( + ObjectIOExceptionMapper exceptionMapper, CatalogS3Config s3config, @CatalogS3Client SdkHttpClient sdkClient, CatalogAdlsConfig adlsConfig, @@ -201,6 +207,27 @@ public RequestSigner signer( return new S3Signer(s3config, secretsProvider, s3sessions); } + @Produces + @Singleton + public EntitySnapshotTaskBehavior entitySnapshotTaskBehavior( + CatalogServiceConfig config, ObjectIOExceptionMapper mapper) { + return new EntitySnapshotTaskBehavior(mapper); + } + + @Produces + @Singleton + public ObjectIOExceptionMapper objectIOExceptionMapper(CatalogServiceConfig config) { + return ObjectIOExceptionMapper.builder() + .clock(systemUTC()) + .retryAfterThrottled(config.retryAfterThrottled()) + .retryAfterNetworkError(config.retryAfterNetworkError()) + .reattemptAfterFetchError(config.reattemptAfterFetchError()) + .addAnalyzer(AdlsExceptionMapper.INSTANCE) + .addAnalyzer(GcsExceptionMapper.INSTANCE) + .addAnalyzer(S3ExceptionMapper.INSTANCE) + .build(); + } + /** * Provides the {@link TasksAsync} instance backed by a thread-pool executor configured according * to {@link CatalogServiceConfig}, with thread-context propagation. diff --git a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java index 7de337312b5..04143be4323 100644 --- a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java +++ b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java @@ -44,6 +44,9 @@ public void taskAttemptRunning() {} @Override public void taskAttemptErrorRetry() {} + @Override + public void taskAttemptRecover() {} + @Override public void taskCreation() {} diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java index 78a5f7f8022..3e5cc86f6c2 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java @@ -17,6 +17,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.InstanceOfAssertFactories.STRING; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -35,17 +36,26 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.IntStream; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTSerializers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.projectnessie.client.api.NessieApiV2; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.objectstoragemock.HeapStorageBucket; +import org.projectnessie.server.catalog.ObjectStorageMockTestResourceLifecycleManager.AccessCheckHandlerHolder; public abstract class AbstractIcebergCatalogUnitTests extends AbstractIcebergCatalogTests { HeapStorageBucket heapStorageBucket; + AccessCheckHandlerHolder accessCheckHandler; @BeforeEach public void clearBucket() { @@ -205,4 +215,54 @@ public void namespacesPaging() throws Exception { .mapToObj(i -> TableIdentifier.of("namespace_0", "view_" + i)) .toList()); } + + @Test + void testStorageReadFailure() throws Exception { + @SuppressWarnings("resource") + RESTCatalog catalog = catalog(); + + TableIdentifier id1 = TableIdentifier.of("ns", "table1"); + catalog.createNamespace(id1.namespace()); + Table table = catalog.buildTable(id1, SCHEMA).create(); + + try (NessieApiV2 api = nessieClientBuilder().build(NessieApiV2.class)) { + api.createNamespace().reference(api.getDefaultBranch()).namespace("test-ns").create(); + api.commitMultipleOperations() + .commitMeta(CommitMeta.fromMessage("test")) + .branch(api.getDefaultBranch()) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table2"), + IcebergTable.of(table.location() + "_test_access_denied_file", 1, 2, 3, 4))) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table3"), + IcebergTable.of(table.location() + "_test_non_existent_file", 1, 2, 3, 4))) + .commit(); + } + + accessCheckHandler.set(key -> !key.contains("_test_access_denied_")); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table2"))) + .isInstanceOf(ForbiddenException.class) + .hasMessageContaining("_test_access_denied_file"); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table3"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("_test_non_existent_file"); + } + + @Test + void testStorageWriteFailure() throws Exception { + @SuppressWarnings("resource") + RESTCatalog catalog = catalog(); + + TableIdentifier id1 = TableIdentifier.of("ns", "table_access_denied"); + catalog.createNamespace(id1.namespace()); + + accessCheckHandler.set(key -> !key.contains("table_access_denied")); + assertThatThrownBy(() -> catalog.buildTable(id1, SCHEMA).create()) + .isInstanceOf(ForbiddenException.class) + .hasMessageContaining("table_access_denied"); + } } diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java index be62f84fc27..43c3aaf6146 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java @@ -19,6 +19,7 @@ import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.projectnessie.objectstoragemock.AccessCheckHandler; import org.projectnessie.objectstoragemock.HeapStorageBucket; import org.projectnessie.objectstoragemock.ObjectStorageMock; import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; @@ -45,6 +46,7 @@ public static String bucketWarehouseLocation(String scheme) { "ObjectStorageMockTestResourceLifecycleManager.initAddress"; private final AssumeRoleHandlerHolder assumeRoleHandler = new AssumeRoleHandlerHolder(); + private final AccessCheckHandlerHolder accessCheckHandler = new AccessCheckHandlerHolder(); private HeapStorageBucket heapStorageBucket; private MockServer server; @@ -58,6 +60,7 @@ public Map start() { .initAddress("localhost") .putBuckets(BUCKET, heapStorageBucket.bucket()) .assumeRoleHandler(assumeRoleHandler) + .accessCheckHandler(accessCheckHandler) .build() .start(); @@ -92,6 +95,9 @@ public void inject(TestInjector testInjector) { testInjector.injectIntoFields( assumeRoleHandler, new TestInjector.MatchesType(AssumeRoleHandlerHolder.class)); + + testInjector.injectIntoFields( + accessCheckHandler, new TestInjector.MatchesType(AccessCheckHandlerHolder.class)); } @Override @@ -137,4 +143,18 @@ public AssumeRoleResult assumeRole( serialNumber); } } + + public static final class AccessCheckHandlerHolder implements AccessCheckHandler { + private final AtomicReference handler = new AtomicReference<>(); + + public void set(AccessCheckHandler handler) { + this.handler.set(handler); + } + + @Override + public boolean accessAllowed(String objectKey) { + AccessCheckHandler delegate = handler.get(); + return delegate == null || delegate.accessAllowed(objectKey); + } + } } diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java index 2803fdb12eb..d6d70e2941d 100644 --- a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java @@ -40,6 +40,8 @@ public interface TaskServiceMetrics { /** Task attempt detected that task ran into a retryable error. */ void taskAttemptErrorRetry(); + void taskAttemptRecover(); + /** New task object is being created in the database. */ void taskCreation(); diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java similarity index 51% rename from catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java rename to testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java index abd38e27cec..78f85527216 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java @@ -13,31 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.projectnessie.catalog.files.api; +package org.projectnessie.objectstoragemock; -import java.time.Instant; -import java.util.Optional; - -public class NonRetryableException extends ObjectIOException { - public NonRetryableException(Throwable cause) { - super(cause); - } - - public NonRetryableException(String message) { - super(message); - } - - public NonRetryableException(String message, Throwable cause) { - super(message, cause); - } - - @Override - public boolean isRetryable() { - return false; - } - - @Override - public Optional retryNotBefore() { - return Optional.empty(); - } +public interface AccessCheckHandler { + boolean accessAllowed(String objectKey); } diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java index b59f1e921b0..1592c3a5aab 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java @@ -84,6 +84,7 @@ public Response create( return withFilesystem( filesystem, + normalizedPath, b -> { Bucket.ObjectUpdater updater = b.updater().update(normalizedPath, Bucket.UpdaterMode.CREATE_NEW); @@ -116,6 +117,7 @@ public Response update( return withFilesystem( filesystem, + normalizedPath, b -> { if (!action.appendOrFlush()) { return notImplemented(); @@ -151,6 +153,7 @@ public Response read( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -194,6 +197,7 @@ public Response getProperties( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -225,6 +229,7 @@ public Response delete( return withFilesystem( filesystem, + normalizedPath, b -> { if (recursive) { try (Stream listStream = @@ -262,6 +267,7 @@ public Response list( return withFilesystem( filesystem, + normalizedPath, b -> { try (Stream listStream = b.lister().list(normalizedPath, continuationToken)) { @@ -349,6 +355,10 @@ private static Response keyNotFound() { Status.NOT_FOUND, "PathNotFound", "The specified path does not exist."); } + private static Response accessDenied() { + return dataLakeStorageError(Status.FORBIDDEN, "Forbidden", "Access Denied."); + } + private static Response dataLakeStorageError(Status status, String code, String message) { return Response.status(status) .header("x-ms-error-code", code) @@ -361,7 +371,12 @@ private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } - private Response withFilesystem(String filesystem, Function worker) { + private Response withFilesystem( + String filesystem, String path, Function worker) { + if (!mockServer.accessCheckHandler().accessAllowed(path)) { + return accessDenied(); + } + Bucket bucket = mockServer.buckets().get(filesystem); if (bucket == null) { return bucketNotFound(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java index 5ac68c901a9..0dc10735edd 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java @@ -174,6 +174,7 @@ public Response deleteObject( @PathParam("bucketName") String bucketName, @PathParam("object") String objectName) { return withBucket( bucketName, + objectName, b -> { if (!b.deleter().delete(objectName)) { return keyNotFound(); @@ -277,6 +278,7 @@ public Response insertObject( InputStream stream) { return withBucket( bucketName, + objectName, bucket -> { try { Bucket.Updater updater = bucket.updater(); @@ -380,6 +382,10 @@ private static Response keyNotFound() { return errorResponse(Status.NOT_FOUND, "The specified key does not exist."); } + private static Response accessDenied() { + return errorResponse(Status.FORBIDDEN, "Access Denied."); + } + private static Response errorResponse(Status status, String message) { return Response.status(status) .type(MediaType.APPLICATION_JSON) @@ -400,10 +406,25 @@ private Response withBucket(String bucketName, Function worker return worker.apply(bucket); } + private Response withBucket( + String bucketName, String objectName, Function worker) { + Bucket bucket = mockServer.buckets().get(bucketName); + if (bucket == null) { + return bucketNotFound(); + } + + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + + return worker.apply(bucket); + } + private Response withBucketObject( String bucketName, String objectName, Function worker) { return withBucket( bucketName, + objectName, bucket -> { MockObject o = bucket.object().retrieve(objectName); if (o == null) { diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java index b40491a53c6..36a8730c637 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java @@ -91,6 +91,11 @@ public AssumeRoleHandler assumeRoleHandler() { .build()); } + @Value.Default + public AccessCheckHandler accessCheckHandler() { + return (key) -> true; + } + public interface MockServer extends AutoCloseable { URI getS3BaseUri(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java index ff0f3eeb4fe..7e7b87f5fef 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java @@ -266,6 +266,10 @@ public Response batchDeleteObjects( b -> { ImmutableBatchDeleteResponse.Builder response = ImmutableBatchDeleteResponse.builder(); for (S3ObjectIdentifier s3ObjectIdentifier : body.objectsToDelete()) { + if (!mockServer.accessCheckHandler().accessAllowed(s3ObjectIdentifier.key())) { + return accessDenied(); + } + if (b.deleter().delete(s3ObjectIdentifier.key())) { response.addDeletedObjects( ImmutableDeletedS3Object.builder() @@ -308,6 +312,7 @@ public Response deleteObject( @PathParam("bucketName") String bucketName, @PathParam("object") String objectName) { return withBucket( bucketName, + objectName, b -> { b.deleter().delete(objectName); return noContent(); @@ -378,6 +383,7 @@ public Response putObject( ) { return withBucket( bucketName, + objectName, bucket -> { try { @@ -461,6 +467,13 @@ private static Response keyNotFound() { .build(); } + private static Response accessDenied() { + return Response.status(Status.FORBIDDEN) + .type(MediaType.APPLICATION_XML_TYPE) + .entity(ErrorResponse.of("AccessDenied", "Access Denied.")) + .build(); + } + private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } @@ -473,10 +486,25 @@ private Response withBucket(String bucketName, Function worker return worker.apply(bucket); } + private Response withBucket( + String bucketName, String objectName, Function worker) { + Bucket bucket = mockServer.buckets().get(bucketName); + if (bucket == null) { + return bucketNotFound(); + } + + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + + return worker.apply(bucket); + } + private Response withBucketObject( String bucketName, String objectName, Function worker) { return withBucket( bucketName, + objectName, bucket -> { MockObject o = bucket.object().retrieve(objectName); if (o == null) {