From 19e964556a82ccb559752f0add1443f6a1e91c76 Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Sun, 9 Jun 2024 21:15:21 -0400 Subject: [PATCH] * Add exception mappers to convert storage failures to Iceberg REST client exceptions * Attempt recovery from failed tasks if/when they are re-submitted after a configurable delay. * Add task deadlines (prevent indefinite retries) * Add AccessCheckHandler to ObjectStorageMock for simulating access failures in tests. Closes #8738 --- .../files/api/BackendThrottledException.java | 84 ------- ...ableException.java => ObjectIOConfig.java} | 39 ++-- .../files/api/ObjectIOExceptionMapper.java | 93 ++++++++ ...ctIOException.java => ObjectIOStatus.java} | 24 +- .../catalog/files/api/RetryableException.java | 46 ---- .../api/TestBackendThrottledException.java | 129 ----------- .../files/s3/S3ClientResourceBench.java | 7 +- .../catalog/files/ResolvingObjectIO.java | 3 +- .../files/adls/AdlsExceptionMapper.java | 34 +++ .../catalog/files/gcs/GcsExceptionMapper.java | 36 +++ .../catalog/files/s3/S3ClientSupplier.java | 4 - .../catalog/files/s3/S3Config.java | 8 - .../catalog/files/s3/S3ExceptionMapper.java | 35 +++ .../catalog/files/s3/S3ObjectIO.java | 19 +- .../catalog/files/s3/TestS3Clients.java | 3 +- .../catalog/files/s3/TestS3ObjectIO.java | 76 ------- .../service/impl/CatalogServiceImpl.java | 33 +-- .../impl/EntitySnapshotTaskBehavior.java | 48 +++- .../impl/EntitySnapshotTaskRequest.java | 8 +- .../catalog/service/impl/IcebergStuff.java | 19 +- .../service/impl/ImportSnapshotWorker.java | 62 ++--- .../catalog/service/impl/Util.java | 41 ---- .../service/impl/TestIcebergStuff.java | 18 +- .../catalog/service/impl/TestUtil.java | 56 ----- .../service/rest/IcebergErrorMapper.java | 56 +++-- .../nessie/cli/commands/WithNessie.java | 3 +- servers/quarkus-common/build.gradle.kts | 1 + .../quarkus/config/CatalogS3Config.java | 5 - .../quarkus/config/CatalogServiceConfig.java | 27 ++- .../s3/ITS3AssumeRoleIcebergCatalog.java | 2 + .../server/catalog/CatalogProducers.java | 27 +++ .../catalog/MonitoredTaskServiceMetrics.java | 3 + .../AbstractIcebergCatalogUnitTests.java | 60 +++++ ...orageMockTestResourceLifecycleManager.java | 20 ++ .../nessie/tasks/api/TaskBehavior.java | 8 +- .../nessie/tasks/api/TaskState.java | 51 ++++- .../service/impl/TaskServiceMetrics.java | 2 + .../tasks/service/impl/TasksServiceImpl.java | 58 ++++- .../service/impl/TestTasksServiceImpl.java | 214 ++++++++++++++++++ .../service/tasktypes/BasicTaskBehavior.java | 33 ++- .../objectstoragemock/AccessCheckHandler.java | 20 ++ .../objectstoragemock/AdlsGen2Resource.java | 17 +- .../objectstoragemock/GcsResource.java | 21 ++ .../objectstoragemock/ObjectStorageMock.java | 5 + .../objectstoragemock/S3Resource.java | 28 +++ .../tools/admin/cli/ITDeleteCatalogTasks.java | 10 +- 46 files changed, 962 insertions(+), 634 deletions(-) delete mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java rename catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/{NonRetryableException.java => ObjectIOConfig.java} (53%) create mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java rename catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/{ObjectIOException.java => ObjectIOStatus.java} (60%) delete mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java delete mode 100644 catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java delete mode 100644 catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java create mode 100644 testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java deleted file mode 100644 index 09862bb27b2..00000000000 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; -import static java.time.temporal.ChronoUnit.SECONDS; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.format.DateTimeParseException; -import java.util.Optional; -import java.util.function.Function; - -public class BackendThrottledException extends RetryableException { - public BackendThrottledException(Instant retryNotBefore, Throwable cause) { - super(retryNotBefore, cause); - } - - public BackendThrottledException(Instant retryNotBefore, String message) { - super(retryNotBefore, message); - } - - public BackendThrottledException(Instant retryNotBefore, String message, Throwable cause) { - super(retryNotBefore, message, cause); - } - - /** - * Provides a {@link BackendThrottledException} instance for HTTP responses, for the HTTP status - * codes 429 (too many requests) + 503 (service unavailable), handling HTTP responses having the - * {@code Retry-After} header. - */ - public static Optional fromHttpStatusCode( - int statusCode, - Function headers, - Clock clock, - Function exceptionSupplier, - Duration defaultRetryAfter) { - switch (statusCode) { - case 429: - case 503: - String retryAfter = headers.apply("Retry-After"); - Instant retryNotBefore = null; - Instant now = clock.instant(); - if (retryAfter != null) { - try { - int seconds = Integer.parseInt(retryAfter); - if (seconds > 0) { - retryNotBefore = now.plus(seconds, SECONDS); - } - } catch (NumberFormatException e) { - try { - retryNotBefore = Instant.from(RFC_1123_DATE_TIME.parse(retryAfter)); - if (retryNotBefore.compareTo(now) <= 0) { - retryNotBefore = null; - } - } catch (DateTimeParseException e2) { - // - } - } - } - if (retryNotBefore == null) { - retryNotBefore = now.plus(defaultRetryAfter); - } - return Optional.of(exceptionSupplier.apply(retryNotBefore)); - default: - break; - } - return Optional.empty(); - } -} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOConfig.java similarity index 53% rename from catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java rename to catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOConfig.java index abd38e27cec..ef810790444 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOConfig.java @@ -15,29 +15,24 @@ */ package org.projectnessie.catalog.files.api; -import java.time.Instant; -import java.util.Optional; +import java.time.Duration; -public class NonRetryableException extends ObjectIOException { - public NonRetryableException(Throwable cause) { - super(cause); - } +public interface ObjectIOConfig { + /** + * Interval after which a request is retried when storage I/O responds with some "retry later" + * response. + */ + Duration retryAfterThrottled(); - public NonRetryableException(String message) { - super(message); - } + /** + * Interval after which a request is retried in case of networks / routing errors (e.g. "Service + * Unavailable"). + */ + Duration retryAfterNetworkError(); - public NonRetryableException(String message, Throwable cause) { - super(message, cause); - } - - @Override - public boolean isRetryable() { - return false; - } - - @Override - public Optional retryNotBefore() { - return Optional.empty(); - } + /** + * Interval after which new requests for data that previously failed to be retrieved from storage + * can be re-attempted. + */ + Duration reattemptAfterFetchError(); } diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java new file mode 100644 index 00000000000..1a580ca4f21 --- /dev/null +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.api; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public abstract class ObjectIOExceptionMapper { + + protected abstract List analyzers(); + + protected abstract Duration retryAfterThrottled(); + + protected abstract Duration retryAfterNetworkError(); + + protected abstract Duration reattemptAfterFetchError(); + + protected abstract Clock clock(); + + public static ImmutableObjectIOExceptionMapper.Builder builder() { + return ImmutableObjectIOExceptionMapper.builder(); + } + + public Optional analyze(Throwable ex) { + for (Throwable th = ex; th != null; th = th.getCause()) { + for (Analyzer analyzer : analyzers()) { + OptionalInt statusCode = analyzer.httpStatusCode(th); + if (statusCode.isPresent()) { + return Optional.of(toStatus(statusCode.getAsInt())); + } + } + } + + return Optional.empty(); + } + + public OptionalInt httpStatusCode(Throwable ex) { + for (Throwable th = ex; th != null; th = th.getCause()) { + for (Analyzer analyzer : analyzers()) { + OptionalInt statusCode = analyzer.httpStatusCode(th); + if (statusCode.isPresent()) { + return statusCode; + } + } + } + + return OptionalInt.empty(); + } + + private Instant retryAfter(Duration delay) { + return clock().instant().plus(delay); + } + + private ObjectIOStatus toStatus(int httpStatus) { + switch (httpStatus) { + case 408: + case 425: + case 429: + return ObjectIOStatus.of(httpStatus, true, retryAfter(retryAfterThrottled())); + + case 502: + case 503: + case 504: + return ObjectIOStatus.of(httpStatus, true, retryAfter(retryAfterNetworkError())); + + default: + return ObjectIOStatus.of(httpStatus, false, retryAfter(reattemptAfterFetchError())); + } + } + + public interface Analyzer { + OptionalInt httpStatusCode(Throwable th); + } +} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java similarity index 60% rename from catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java rename to catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java index 45e0d58c30d..4673bb7c8e4 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOStatus.java @@ -15,24 +15,18 @@ */ package org.projectnessie.catalog.files.api; -import java.io.IOException; import java.time.Instant; -import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; -public abstract class ObjectIOException extends IOException { - public ObjectIOException(Throwable cause) { - super(cause); - } +@NessieImmutable +public interface ObjectIOStatus { + int httpStatusCode(); - public ObjectIOException(String message) { - super(message); - } + boolean isRetryable(); - public ObjectIOException(String message, Throwable cause) { - super(message, cause); - } + Instant reattemptAfter(); - public abstract boolean isRetryable(); - - public abstract Optional retryNotBefore(); + static ObjectIOStatus of(int httpCode, boolean retryable, Instant reattemptAfter) { + return ImmutableObjectIOStatus.of(httpCode, retryable, reattemptAfter); + } } diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java deleted file mode 100644 index 4d8e4a693b0..00000000000 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import java.time.Instant; -import java.util.Optional; - -public class RetryableException extends ObjectIOException { - private final Instant retryNotBefore; - - public RetryableException(Instant retryNotBefore, Throwable cause) { - super(cause); - this.retryNotBefore = retryNotBefore; - } - - public RetryableException(Instant retryNotBefore, String message) { - super(message); - this.retryNotBefore = retryNotBefore; - } - - public RetryableException(Instant retryNotBefore, String message, Throwable cause) { - super(message, cause); - this.retryNotBefore = retryNotBefore; - } - - public boolean isRetryable() { - return true; - } - - public Optional retryNotBefore() { - return Optional.of(retryNotBefore); - } -} diff --git a/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java b/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java deleted file mode 100644 index 8f3e8cc1a5c..00000000000 --- a/catalog/files/api/src/test/java/org/projectnessie/catalog/files/api/TestBackendThrottledException.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.api; - -import static java.time.Clock.systemUTC; -import static java.time.Instant.now; -import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; -import static java.time.temporal.ChronoUnit.DAYS; -import static java.time.temporal.ChronoUnit.SECONDS; -import static org.assertj.core.api.InstanceOfAssertFactories.optional; -import static org.junit.jupiter.params.provider.Arguments.arguments; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; -import java.util.stream.Stream; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -@ExtendWith(SoftAssertionsExtension.class) -public class TestBackendThrottledException { - @InjectSoftAssertions protected SoftAssertions soft; - - @Test - public void nonRetryableHttp() { - for (int i = 0; i <= 999; i++) { - // HTTP 429 and 503 - if (i == 429 || i == 503) { - continue; - } - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - i, - header -> null, - systemUTC(), - instant -> new BackendThrottledException(instant, "hello"), - Duration.of(10, SECONDS))) - .isEmpty(); - } - } - - static Instant now = now().truncatedTo(SECONDS); - - @ParameterizedTest - @MethodSource - public void retryableDefaults(String retryAfterHeader) { - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration fallback = Duration.of(10, SECONDS); - Instant expected = clock.instant().plus(fallback); - - for (int statusCode : new int[] {429, 503}) { - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - statusCode, - header -> retryAfterHeader, - clock, - instant -> new BackendThrottledException(instant, "hello"), - fallback)) - .get() - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(expected); - } - } - - static Stream retryableDefaults() { - return Stream.of( - null, - "dummy", - "0", - "-1", - "" + Integer.MIN_VALUE, - formatInstant(now), - formatInstant(now.minus(1, SECONDS))); - } - - static String formatInstant(Instant instant) { - return RFC_1123_DATE_TIME.format(instant.atZone(ZoneId.of("UTC"))); - } - - @ParameterizedTest - @MethodSource - public void retryableWithHeader(String retryAfterHeader, Instant expected) { - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration fallback = Duration.of(10, SECONDS); - - for (int statusCode : new int[] {429, 503}) { - soft.assertThat( - BackendThrottledException.fromHttpStatusCode( - statusCode, - header -> retryAfterHeader, - clock, - instant -> new BackendThrottledException(instant, "hello"), - fallback)) - .get() - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(expected); - } - } - - static Stream retryableWithHeader() { - return Stream.of( - arguments("42", now.plus(42, SECONDS)), - arguments(formatInstant(now.plus(42, SECONDS)), now.plus(42, SECONDS)), - arguments("86400", now.plus(86400, SECONDS)), - arguments(formatInstant(now.plus(123, DAYS)), now.plus(123, DAYS))); - } -} diff --git a/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java b/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java index 94d7bd1419b..0055b11483f 100644 --- a/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java +++ b/catalog/files/impl/src/jmh/java/org/projectnessie/catalog/files/s3/S3ClientResourceBench.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; -import java.time.Clock; import java.util.Map; import java.util.stream.Collectors; import org.openjdk.jmh.annotations.Benchmark; @@ -105,7 +104,7 @@ public void s3client(BenchmarkParam param, Blackhole bh) { @Benchmark public void s3Get(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/key"))) { bh.consume(in.readAllBytes()); } @@ -113,7 +112,7 @@ public void s3Get(BenchmarkParam param, Blackhole bh) throws IOException { @Benchmark public void s3Get250k(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/s-256000"))) { bh.consume(in.readAllBytes()); } @@ -121,7 +120,7 @@ public void s3Get250k(BenchmarkParam param, Blackhole bh) throws IOException { @Benchmark public void s3Get4M(BenchmarkParam param, Blackhole bh) throws IOException { - S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier, Clock.systemUTC()); + S3ObjectIO objectIO = new S3ObjectIO(param.clientSupplier); try (InputStream in = objectIO.readObject(StorageUri.of("s3://bucket/s-4194304"))) { bh.consume(in.readAllBytes()); } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java index b6fc5ee3114..e229f29eb9e 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/ResolvingObjectIO.java @@ -15,7 +15,6 @@ */ package org.projectnessie.catalog.files; -import java.time.Clock; import org.projectnessie.catalog.files.adls.AdlsClientSupplier; import org.projectnessie.catalog.files.adls.AdlsObjectIO; import org.projectnessie.catalog.files.api.ObjectIO; @@ -34,7 +33,7 @@ public ResolvingObjectIO( S3ClientSupplier s3ClientSupplier, AdlsClientSupplier adlsClientSupplier, GcsStorageSupplier gcsStorageSupplier) { - this.s3ObjectIO = new S3ObjectIO(s3ClientSupplier, Clock.systemUTC()); + this.s3ObjectIO = new S3ObjectIO(s3ClientSupplier); this.gcsObjectIO = new GcsObjectIO(gcsStorageSupplier); this.adlsObjectIO = new AdlsObjectIO(adlsClientSupplier); } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java new file mode 100644 index 00000000000..3d7444c9848 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.adls; + +import com.azure.storage.blob.models.BlobStorageException; +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; + +public class AdlsExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + public static final AdlsExceptionMapper INSTANCE = new AdlsExceptionMapper(); + + private AdlsExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof BlobStorageException) { + return OptionalInt.of(((BlobStorageException) th).getStatusCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java new file mode 100644 index 00000000000..6cbf74617a6 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.gcs; + +import com.google.cloud.storage.StorageException; +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; + +/** Extracts storage-side HTTP status code from GCS client exceptions. */ +public final class GcsExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + + public static final GcsExceptionMapper INSTANCE = new GcsExceptionMapper(); + + private GcsExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof StorageException) { + return OptionalInt.of(((StorageException) th).getCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java index 087a15fba36..f9da1ba7aa3 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ClientSupplier.java @@ -62,10 +62,6 @@ public S3ClientSupplier( this.sessions = sessions; } - public S3Config s3config() { - return s3config; - } - public S3Options s3options() { return s3options; } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java index 0ce27b1ef0d..08f431e3ec1 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3Config.java @@ -55,11 +55,6 @@ public interface S3Config { @ConfigItem(section = "transport") Optional expectContinueEnabled(); - /** - * Interval after which a request is retried when S3 response with some "retry later" response. - */ - Optional retryAfter(); - static Builder builder() { return ImmutableS3Config.builder(); } @@ -86,9 +81,6 @@ interface Builder { @CanIgnoreReturnValue Builder expectContinueEnabled(boolean expectContinueEnabled); - @CanIgnoreReturnValue - Builder retryAfter(Duration retryAfter); - S3Config build(); } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java new file mode 100644 index 00000000000..35b0b94e485 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.s3; + +import java.util.OptionalInt; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import software.amazon.awssdk.core.exception.SdkServiceException; + +public class S3ExceptionMapper implements ObjectIOExceptionMapper.Analyzer { + + public static final S3ExceptionMapper INSTANCE = new S3ExceptionMapper(); + + private S3ExceptionMapper() {} + + @Override + public OptionalInt httpStatusCode(Throwable th) { + if (th instanceof SdkServiceException) { + return OptionalInt.of(((SdkServiceException) th).statusCode()); + } + return OptionalInt.empty(); + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java index 978a591060e..5c738e4d258 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java @@ -16,17 +16,12 @@ package org.projectnessie.catalog.files.s3; import static com.google.common.base.Preconditions.checkArgument; -import static java.time.temporal.ChronoUnit.SECONDS; import static org.projectnessie.catalog.files.s3.S3Utils.isS3scheme; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.time.Clock; -import java.time.Duration; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.catalog.files.api.NonRetryableException; import org.projectnessie.catalog.files.api.ObjectIO; import org.projectnessie.storage.uri.StorageUri; import software.amazon.awssdk.core.exception.SdkServiceException; @@ -38,11 +33,9 @@ public class S3ObjectIO implements ObjectIO { private final S3ClientSupplier s3clientSupplier; - private final Clock clock; - public S3ObjectIO(S3ClientSupplier s3clientSupplier, Clock clock) { + public S3ObjectIO(S3ClientSupplier s3clientSupplier) { this.s3clientSupplier = s3clientSupplier; - this.clock = clock; } @Override @@ -70,15 +63,7 @@ public InputStream readObject(StorageUri uri) throws IOException { .key(withoutLeadingSlash(uri)) .build()); } catch (SdkServiceException e) { - if (e.isThrottlingException()) { - throw new BackendThrottledException( - clock - .instant() - .plus(s3clientSupplier.s3config().retryAfter().orElse(Duration.of(10, SECONDS))), - "S3 throttled", - e); - } - throw new NonRetryableException(e); + throw new IOException(e); } } diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java index d53dba60e54..548f91bc905 100644 --- a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java +++ b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3Clients.java @@ -18,7 +18,6 @@ import static java.util.function.Function.identity; import static org.projectnessie.catalog.secrets.BasicCredentials.basicCredentials; -import java.time.Clock; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; @@ -81,7 +80,7 @@ protected ObjectIO buildObjectIO( names.stream() .collect(Collectors.toMap(identity(), k -> Map.of("secret", "secret")))), null); - return new S3ObjectIO(supplier, Clock.systemUTC()); + return new S3ObjectIO(supplier); } @Override diff --git a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java b/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java deleted file mode 100644 index 00852298754..00000000000 --- a/catalog/files/impl/src/test/java/org/projectnessie/catalog/files/s3/TestS3ObjectIO.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright (C) 2024 Dremio - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.projectnessie.catalog.files.s3; - -import static java.time.temporal.ChronoUnit.SECONDS; -import static org.assertj.core.api.InstanceOfAssertFactories.optional; -import static org.assertj.core.api.InstanceOfAssertFactories.type; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.storage.uri.StorageUri; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3Utilities; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; - -@ExtendWith(SoftAssertionsExtension.class) -public class TestS3ObjectIO { - @InjectSoftAssertions protected SoftAssertions soft; - - @SuppressWarnings("resource") - @Test - public void readObjectThrottledThrowsBackendThrottledException() { - S3Client s3client = mock(S3Client.class); - - Instant now = Instant.now(); - Clock clock = Clock.fixed(now, ZoneId.of("UTC")); - Duration defaultRetryAfter = Duration.of(10, SECONDS); - StorageUri location = StorageUri.of("s3://hello/foo/bar"); - - when(s3client.utilities()) - .thenReturn(S3Utilities.builder().region(Region.EU_CENTRAL_1).build()); - - when(s3client.getObject(any(GetObjectRequest.class))) - .thenThrow(S3Exception.builder().statusCode(429).message("blah").build()); - - S3ClientSupplier s3ClientSupplier = mock(S3ClientSupplier.class); - when(s3ClientSupplier.getClient(location)).thenReturn(s3client); - when(s3ClientSupplier.s3config()) - .thenReturn(S3Config.builder().retryAfter(defaultRetryAfter).build()); - - S3ObjectIO objectIO = new S3ObjectIO(s3ClientSupplier, clock); - - soft.assertThatThrownBy(() -> objectIO.readObject(location)) - .isInstanceOf(BackendThrottledException.class) - .asInstanceOf(type(BackendThrottledException.class)) - .extracting(BackendThrottledException::retryNotBefore, optional(Instant.class)) - .get() - .isEqualTo(now.plus(defaultRetryAfter)); - } -} diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java index 5015d83a4ad..e8cb9d7b74a 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/CatalogServiceImpl.java @@ -43,7 +43,6 @@ import jakarta.enterprise.context.RequestScoped; import jakarta.inject.Inject; import jakarta.inject.Named; -import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; @@ -117,12 +116,17 @@ public class CatalogServiceImpl implements CatalogService { @Inject NessieApiV2 nessieApi; @Inject Persist persist; @Inject TasksService tasksService; + @Inject EntitySnapshotTaskBehavior snapshotTaskBehavior; @Inject CatalogConfig catalogConfig; @Inject @Named("import-jobs") Executor executor; + private IcebergStuff icebergStuff() { + return new IcebergStuff(objectIO, persist, tasksService, snapshotTaskBehavior, executor); + } + @Override public Stream>> retrieveSnapshots( SnapshotReqParams reqParams, @@ -145,7 +149,7 @@ public Stream>> retrieveSnapshots( .keys(keys) .getWithResponse(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = icebergStuff(); Reference effectiveReference = contentResponse.getEffectiveReference(); effectiveReferenceConsumer.accept(effectiveReference); @@ -206,8 +210,7 @@ public CompletionStage retrieveSnapshot( ObjId snapshotId = snapshotIdFromContent(content); CompletionStage> snapshotStage = - new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + icebergStuff().retrieveIcebergSnapshot(snapshotId, content); return snapshotStage.thenApply( snapshot -> snapshotResponse(key, content, reqParams, snapshot, effectiveReference)); @@ -344,7 +347,7 @@ public CompletionStage> commit( Map contents = contentsResponse.toContentsMap(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = icebergStuff(); CommitMultipleOperationsBuilder nessieCommit = nessieApi.commitMultipleOperations().branch(target); @@ -672,25 +675,24 @@ static final class SingleTableUpdate { private CompletionStage loadExistingTableSnapshot(Content content) throws NessieContentNotFoundException { ObjId snapshotId = snapshotIdFromContent(content); - return new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } private CompletionStage loadExistingViewSnapshot(Content content) throws NessieContentNotFoundException { ObjId snapshotId = snapshotIdFromContent(content); - return new IcebergStuff(objectIO, persist, tasksService, executor) - .retrieveIcebergSnapshot(snapshotId, content); + return icebergStuff().retrieveIcebergSnapshot(snapshotId, content); } private IcebergTableMetadata storeTableSnapshot( String metadataJsonLocation, NessieTableSnapshot snapshot) { IcebergTableMetadata tableMetadata = nessieTableSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { + StorageUri uri = StorageUri.of(metadataJsonLocation); + try (OutputStream out = objectIO.writeObject(uri)) { IcebergJson.objectMapper().writeValue(out, tableMetadata); - } catch (IOException ex) { - throw new RuntimeException(ex); + } catch (Exception ex) { + throw new RuntimeException("Failed to write table snapshot to: " + uri, ex); } return tableMetadata; } @@ -699,10 +701,11 @@ private IcebergViewMetadata storeViewSnapshot( String metadataJsonLocation, NessieViewSnapshot snapshot) { IcebergViewMetadata viewMetadata = nessieViewSnapshotToIceberg(snapshot, Optional.empty(), p -> {}); - try (OutputStream out = objectIO.writeObject(StorageUri.of(metadataJsonLocation))) { + StorageUri uri = StorageUri.of(metadataJsonLocation); + try (OutputStream out = objectIO.writeObject(uri)) { IcebergJson.objectMapper().writeValue(out, viewMetadata); - } catch (IOException ex) { - throw new RuntimeException(ex); + } catch (Exception ex) { + throw new RuntimeException("Failed to write view snapshot to: " + uri, ex); } return viewMetadata; } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java index 5c7ee2fd699..86ec2adb4de 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskBehavior.java @@ -17,21 +17,29 @@ import static java.time.temporal.ChronoUnit.MINUTES; import static java.time.temporal.ChronoUnit.SECONDS; -import static org.projectnessie.catalog.service.impl.Util.throwableAsErrorTaskState; +import static org.projectnessie.nessie.tasks.api.TaskState.failureState; +import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.nessie.tasks.api.TaskState.runningState; import java.time.Clock; +import java.time.Duration; import java.time.Instant; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.service.objtypes.EntitySnapshotObj; import org.projectnessie.nessie.tasks.api.TaskBehavior; import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjType; -final class EntitySnapshotTaskBehavior +public final class EntitySnapshotTaskBehavior implements TaskBehavior { - static final EntitySnapshotTaskBehavior INSTANCE = new EntitySnapshotTaskBehavior(); + private final ObjectIOExceptionMapper exceptionMapper; + private final Duration taskTimeout; - private EntitySnapshotTaskBehavior() {} + public EntitySnapshotTaskBehavior(ObjectIOExceptionMapper exceptionMapper, Duration taskTimeout) { + this.exceptionMapper = exceptionMapper; + this.taskTimeout = taskTimeout; + } @Override public Throwable stateAsException(EntitySnapshotObj obj) { @@ -43,12 +51,17 @@ public Instant performRunningStateUpdateAt(Clock clock, EntitySnapshotObj runnin return clock.instant().plus(2, SECONDS); } + private TaskState taskState(@Nullable EntitySnapshotObj obj) { + return obj == null ? null : obj.taskState(); + } + @Override public TaskState runningTaskState(Clock clock, EntitySnapshotObj running) { Instant now = clock.instant(); Instant retryNotBefore = now.plus(2, SECONDS); Instant lostNotBefore = now.plus(1, MINUTES); - return runningState(retryNotBefore, lostNotBefore); + return runningState( + retryNotBefore, lostNotBefore, taskState(running), () -> now.plus(taskTimeout)); } @Override @@ -61,8 +74,31 @@ public ObjType objType() { return EntitySnapshotObj.OBJ_TYPE; } + @Override + public TaskState timedOutTaskState(Clock clock, EntitySnapshotObj base, Throwable t) { + // Allow immediate retry for unexpected errors (usually requires re-submitting the task). + return failureState(clock.instant(), t.toString()); + } + @Override public TaskState asErrorTaskState(Clock clock, EntitySnapshotObj base, Throwable t) { - return throwableAsErrorTaskState(t); + return exceptionMapper + .analyze(t) + .map( + status -> { + if (status.isRetryable()) { + return retryableErrorState( + status.reattemptAfter(), + t.toString(), + base.taskState(), + () -> clock.instant().plus(taskTimeout)); + } else { + return failureState(status.reattemptAfter(), t.toString()); + } + }) + // Use the deadline as "retry not before" for unexpected errors. This is only to allow + // re-attempting those tasks in principle, while we cannot have a more reasonable reattempt + // timeout. Introducing a separate config for that looks like an overkill. + .orElseGet(() -> failureState(clock.instant().plus(taskTimeout), t.toString())); } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java index dfed6d2e08b..cf32f27d4fd 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/EntitySnapshotTaskRequest.java @@ -43,10 +43,7 @@ default ObjType objType() { } @Override - @Value.NonAttribute - default TaskBehavior behavior() { - return EntitySnapshotTaskBehavior.INSTANCE; - } + TaskBehavior behavior(); @Override ObjId objId(); @@ -92,10 +89,11 @@ static EntitySnapshotTaskRequest entitySnapshotTaskRequest( ObjId objId, Content content, NessieEntitySnapshot snapshot, + EntitySnapshotTaskBehavior behavior, Persist persist, ObjectIO objectIO, Executor executor) { return ImmutableEntitySnapshotTaskRequest.of( - objId, content, snapshot, persist, objectIO, executor); + behavior, objId, content, snapshot, persist, objectIO, executor); } } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java index 82f232c51dc..06245de9be9 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/IcebergStuff.java @@ -43,13 +43,19 @@ public class IcebergStuff { private final ObjectIO objectIO; private final Persist persist; private final TasksService tasksService; + private final EntitySnapshotTaskBehavior snapshotTaskBehavior; private final Executor executor; public IcebergStuff( - ObjectIO objectIO, Persist persist, TasksService tasksService, Executor executor) { + ObjectIO objectIO, + Persist persist, + TasksService tasksService, + EntitySnapshotTaskBehavior snapshotTaskBehavior, + Executor executor) { this.objectIO = objectIO; this.persist = persist; this.tasksService = tasksService; + this.snapshotTaskBehavior = snapshotTaskBehavior; this.executor = executor; } @@ -61,7 +67,8 @@ public IcebergStuff( public > CompletionStage retrieveIcebergSnapshot( ObjId snapshotId, Content content) { EntitySnapshotTaskRequest snapshotTaskRequest = - entitySnapshotTaskRequest(snapshotId, content, null, persist, objectIO, executor); + entitySnapshotTaskRequest( + snapshotId, content, null, snapshotTaskBehavior, persist, objectIO, executor); return triggerIcebergSnapshot(snapshotTaskRequest); } @@ -90,7 +97,13 @@ public > CompletionStage storeSnapshot( S snapshot, Content content) { EntitySnapshotTaskRequest snapshotTaskRequest = entitySnapshotTaskRequest( - nessieIdToObjId(snapshot.id()), content, snapshot, persist, objectIO, executor); + nessieIdToObjId(snapshot.id()), + content, + snapshot, + snapshotTaskBehavior, + persist, + objectIO, + executor); return triggerIcebergSnapshot(snapshotTaskRequest); } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java index de0468b38eb..9183fc48c96 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java @@ -24,7 +24,6 @@ import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; import static org.projectnessie.versioned.storage.common.persist.ObjIdHasher.objIdHasher; -import java.io.IOException; import java.io.InputStream; import java.time.Instant; import java.util.UUID; @@ -66,31 +65,18 @@ final class ImportSnapshotWorker { EntitySnapshotObj.Builder importSnapshot() { Content content = taskRequest.content(); if (content instanceof IcebergTable) { - try { - return importIcebergTable( - (IcebergTable) content, (NessieTableSnapshot) taskRequest.snapshot()); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } + return importIcebergTable( + (IcebergTable) content, (NessieTableSnapshot) taskRequest.snapshot()); } if (content instanceof IcebergView) { - try { - return importIcebergView( - (IcebergView) content, (NessieViewSnapshot) taskRequest.snapshot()); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } + return importIcebergView((IcebergView) content, (NessieViewSnapshot) taskRequest.snapshot()); } throw new UnsupportedOperationException("Unsupported Nessie content type " + content.getType()); } private EntitySnapshotObj.Builder importIcebergTable( - IcebergTable content, NessieTableSnapshot snapshot) throws Exception { + IcebergTable content, NessieTableSnapshot snapshot) { NessieId snapshotId = objIdToNessieId(taskRequest.objId()); LOGGER.debug( @@ -107,8 +93,9 @@ private EntitySnapshotObj.Builder importIcebergTable( // snapshot!=null means that we already have the snapshot (via a committing operation - like a // table update) and do not need to import it but can just store it. if (snapshot == null) { - IcebergTableMetadata tableMetadata; StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation()); + NessieTable table; + IcebergTableMetadata tableMetadata; try { InputStream input = taskRequest.objectIO().readObject(metadataLocation); if (metadataLocation.requiredPath().endsWith(".gz") @@ -116,13 +103,12 @@ private EntitySnapshotObj.Builder importIcebergTable( input = new GZIPInputStream(input); } tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class); - } catch (IOException e) { - throw new IOException( + table = entityObjForContent(content, tableMetadata, entityObjId); + } catch (Exception e) { + throw new RuntimeException( "Failed to read table metadata from " + content.getMetadataLocation(), e); } - NessieTable table = entityObjForContent(content, tableMetadata, entityObjId); - snapshot = icebergTableSnapshotToNessie( snapshotId, @@ -176,7 +162,7 @@ private EntitySnapshotObj.Builder importIcebergTable( } private EntitySnapshotObj.Builder importIcebergView( - IcebergView content, NessieViewSnapshot snapshot) throws Exception { + IcebergView content, NessieViewSnapshot snapshot) { NessieId snapshotId = objIdToNessieId(taskRequest.objId()); LOGGER.debug( @@ -193,6 +179,7 @@ private EntitySnapshotObj.Builder importIcebergView( // snapshot!=null means that we already have the snapshot (via a committing operation - like a // table update) and do not need to import it but can just store it. if (snapshot == null) { + NessieView view; IcebergViewMetadata viewMetadata; StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation()); try { @@ -202,23 +189,22 @@ private EntitySnapshotObj.Builder importIcebergView( input = new GZIPInputStream(input); } viewMetadata = IcebergJson.objectMapper().readValue(input, IcebergViewMetadata.class); - } catch (IOException e) { - throw new IOException( + view = + entityObjForContent( + content, + viewMetadata, + entityObjId, + () -> + viewMetadata.versions().stream() + .filter(v -> v.versionId() == viewMetadata.currentVersionId()) + .findFirst() + .orElseThrow() + .timestampMs()); + } catch (Exception e) { + throw new RuntimeException( "Failed to read view metadata from " + content.getMetadataLocation(), e); } - NessieView view = - entityObjForContent( - content, - viewMetadata, - entityObjId, - () -> - viewMetadata.versions().stream() - .filter(v -> v.versionId() == viewMetadata.currentVersionId()) - .findFirst() - .orElseThrow() - .timestampMs()); - snapshot = icebergViewSnapshotToNessie(snapshotId, null, view, viewMetadata); } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java index 2583414aecc..aa6dbefc48d 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/Util.java @@ -18,17 +18,11 @@ import static org.projectnessie.catalog.model.id.NessieId.emptyNessieId; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromByteAccessor; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromLongs; -import static org.projectnessie.nessie.tasks.api.TaskState.failureState; -import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteAccessor; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromLongs; import static org.projectnessie.versioned.storage.common.persist.ObjId.zeroLengthObjId; -import java.util.Optional; -import java.util.function.Function; -import org.projectnessie.catalog.files.api.ObjectIOException; import org.projectnessie.catalog.model.id.NessieId; -import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjId; final class Util { @@ -55,39 +49,4 @@ static NessieId objIdToNessieId(ObjId id) { return nessieIdFromByteAccessor(id.size(), id::byteAt); } } - - static TaskState throwableAsErrorTaskState(Throwable throwable) { - return anyCauseMatches( - throwable, - e -> { - if (e instanceof ObjectIOException) { - return ((ObjectIOException) e) - .retryNotBefore() - .map(notBefore -> retryableErrorState(notBefore, e.toString())) - .orElse(null); - } - return null; - }) - .orElseGet(() -> failureState(throwable.toString())); - } - - static Optional anyCauseMatches( - Throwable throwable, Function mappingPredicate) { - if (throwable == null) { - return Optional.empty(); - } - for (Throwable e = throwable; e != null; e = e.getCause()) { - R r = mappingPredicate.apply(e); - if (r != null) { - return Optional.of(r); - } - for (Throwable sup : e.getSuppressed()) { - r = mappingPredicate.apply(sup); - if (r != null) { - return Optional.of(r); - } - } - } - return Optional.empty(); - } } diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java index 364a42060b0..6ed29cb832e 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestIcebergStuff.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.time.Clock; +import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -43,6 +44,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.files.local.LocalObjectIO; import org.projectnessie.catalog.formats.iceberg.fixtures.IcebergGenerateFixtures; import org.projectnessie.catalog.model.snapshot.NessieTableSnapshot; @@ -106,8 +108,22 @@ public void retryImportManifests() { @MethodSource("icebergTableImports") public void icebergTableImports( @SuppressWarnings("unused") String testName, String icebergTableMetadata) throws Exception { + ObjectIOExceptionMapper exceptionMapper = + ObjectIOExceptionMapper.builder() + .clock(Clock.systemUTC()) + .retryAfterThrottled(Duration.ofMillis(1)) + .retryAfterNetworkError(Duration.ofMillis(1)) + .reattemptAfterFetchError(Duration.ofMillis(1)) + .build(); + ObjectIO objectIO = new LocalObjectIO(); - IcebergStuff icebergStuff = new IcebergStuff(objectIO, persist, tasksService, executor); + IcebergStuff icebergStuff = + new IcebergStuff( + objectIO, + persist, + tasksService, + new EntitySnapshotTaskBehavior(exceptionMapper, Duration.ofSeconds(5)), + executor); ObjId snapshotId = randomObjId(); IcebergTable icebergTable = diff --git a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java index 55f251274f5..7e4e8da3f65 100644 --- a/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java +++ b/catalog/service/impl/src/test/java/org/projectnessie/catalog/service/impl/TestUtil.java @@ -15,18 +15,13 @@ */ package org.projectnessie.catalog.service.impl; -import static java.time.temporal.ChronoUnit.MINUTES; import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.projectnessie.catalog.model.id.NessieId.emptyNessieId; import static org.projectnessie.catalog.model.id.NessieId.nessieIdFromBytes; -import static org.projectnessie.nessie.tasks.api.TaskState.failureState; -import static org.projectnessie.nessie.tasks.api.TaskState.retryableErrorState; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; import static org.projectnessie.versioned.storage.common.persist.ObjId.zeroLengthObjId; -import java.time.Instant; -import java.util.Optional; import java.util.stream.IntStream; import java.util.stream.Stream; import org.assertj.core.api.SoftAssertions; @@ -36,10 +31,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.catalog.files.api.ObjectIOException; import org.projectnessie.catalog.model.id.NessieId; -import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjId; @ExtendWith(SoftAssertionsExtension.class) @@ -144,52 +136,4 @@ static Stream nessieIdObjId() { .mapToObj(x -> randomObjId()) .map(objId -> arguments(nessieIdFromBytes(objId.asByteArray()), objId))); } - - @ParameterizedTest - @MethodSource - public void anyCauseMatches(Throwable throwable, Instant expected) { - soft.assertThat( - Util.anyCauseMatches( - throwable, - t -> - t instanceof ObjectIOException - ? ((ObjectIOException) t).retryNotBefore().orElse(null) - : null)) - .isEqualTo(Optional.ofNullable(expected)); - } - - @ParameterizedTest - @MethodSource("anyCauseMatches") - public void throwableAsErrorTaskState(Throwable throwable, Instant expected) { - TaskState taskState = Util.throwableAsErrorTaskState(throwable); - - soft.assertThat(taskState) - .isEqualTo( - expected != null - ? retryableErrorState(expected, BackendThrottledException.class.getName() + ": foo") - : failureState(throwable.toString())); - } - - static Stream anyCauseMatches() { - Instant retryNotBefore = Instant.now().plus(42, MINUTES); - BackendThrottledException throttled = new BackendThrottledException(retryNotBefore, "foo"); - - Exception sup1 = new Exception(); - sup1.addSuppressed(throttled); - - Exception sup2 = new Exception(); - sup2.addSuppressed(new RuntimeException()); - sup2.addSuppressed(throttled); - - return Stream.of( - arguments(throttled, retryNotBefore), - arguments(new Exception(throttled), retryNotBefore), - arguments(new Exception(new Exception(throttled)), retryNotBefore), - arguments(new Exception(), null), - arguments(new Exception(new Exception()), null), - arguments(sup1, retryNotBefore), - arguments(sup2, retryNotBefore), - arguments(new Exception(sup1), retryNotBefore), - arguments(new Exception(sup2), retryNotBefore)); - } } diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java index 51140966f13..813743150f6 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java @@ -28,8 +28,9 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletionException; +import java.util.OptionalInt; import java.util.stream.Collectors; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse; import org.projectnessie.catalog.formats.iceberg.rest.IcebergException; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; @@ -55,29 +56,34 @@ public class IcebergErrorMapper { private static final Logger LOGGER = LoggerFactory.getLogger(IcebergErrorMapper.class); private final ExceptionConfig exceptionConfig; + private final ObjectIOExceptionMapper ioExceptionMapper; @Inject - public IcebergErrorMapper(ExceptionConfig exceptionConfig) { + public IcebergErrorMapper( + ExceptionConfig exceptionConfig, ObjectIOExceptionMapper ioExceptionMapper) { this.exceptionConfig = exceptionConfig; + this.ioExceptionMapper = ioExceptionMapper; } public Response toResponse(Throwable ex, IcebergEntityKind kind) { - if (ex instanceof CompletionException) { - ex = ex.getCause(); - } - - if (ex.getClass() == RuntimeException.class && ex.getCause() != null) { - ex = ex.getCause(); + IcebergErrorResponse body = null; + for (Throwable th = ex; th != null; th = th.getCause()) { + if (th instanceof BaseNessieClientServerException) { + BaseNessieClientServerException e = (BaseNessieClientServerException) th; + body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind); + break; + } else if (th instanceof IllegalArgumentException) { + body = errorResponse(400, "IllegalArgumentException", th.getMessage(), th); + break; + } else if (th instanceof IcebergException) { + body = ((IcebergException) th).toErrorResponse(); + break; + } } - IcebergErrorResponse body = null; - if (ex instanceof BaseNessieClientServerException) { - BaseNessieClientServerException e = (BaseNessieClientServerException) ex; - body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind); - } else if (ex instanceof IllegalArgumentException) { - body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex); - } else if (ex instanceof IcebergException) { - body = ((IcebergException) ex).toErrorResponse(); + OptionalInt status = ioExceptionMapper.httpStatusCode(ex); + if (status.isPresent()) { + body = mapStorageFailure(status.getAsInt(), ex); } if (body == null) { @@ -89,6 +95,24 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) { return Response.status(code == null ? 500 : code).entity(body).build(); } + private IcebergErrorResponse mapStorageFailure(int httpStatusCode, Throwable ex) { + // Log full stack trace on the server side for troubleshooting + LOGGER.info("Propagating storage failure to client: {}", ex, ex); + + switch (httpStatusCode) { + case 401: + return errorResponse(httpStatusCode, "NotAuthorizedException", ex.toString(), ex); + case 403: + return errorResponse(httpStatusCode, "ForbiddenException", ex.toString(), ex); + case 404: + // Convert storage-side "not found" into `IllegalArgumentException`. + // In most cases this results from bad locations in Iceberg metadata. + return errorResponse(400, "IllegalArgumentException", ex.toString(), ex); + default: + return null; + } + } + private IcebergErrorResponse mapNessieError( Exception ex, ErrorCode err, NessieErrorDetails errorDetails, IcebergEntityKind kind) { switch (err) { diff --git a/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java b/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java index da5e8047e89..ed79c7e69f9 100644 --- a/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java +++ b/cli/cli/src/intTest/java/org/projectnessie/nessie/cli/commands/WithNessie.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.time.Clock; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -133,7 +132,7 @@ protected static void setupObjectStoreAndNessie( .collect(Collectors.toMap(k -> k, k -> Map.of("secret", "secret")))), sessions); - ObjectIO objectIO = new S3ObjectIO(clientSupplier, Clock.systemUTC()); + ObjectIO objectIO = new S3ObjectIO(clientSupplier); tableOneMetadataLocation = generateMetadataWithManifestList( diff --git a/servers/quarkus-common/build.gradle.kts b/servers/quarkus-common/build.gradle.kts index 320a5e7d4be..505adfc436f 100644 --- a/servers/quarkus-common/build.gradle.kts +++ b/servers/quarkus-common/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { implementation(project(":nessie-services")) implementation(project(":nessie-services-config")) implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-catalog-files-api")) implementation(project(":nessie-catalog-files-impl")) implementation(project(":nessie-catalog-service-common")) implementation(project(":nessie-catalog-secrets-api")) diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java index f6630a21fc4..6cd88fef58b 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogS3Config.java @@ -16,7 +16,6 @@ package org.projectnessie.quarkus.config; import io.smallrye.config.ConfigMapping; -import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; import java.time.Duration; import java.util.Map; @@ -37,10 +36,6 @@ */ @ConfigMapping(prefix = "nessie.catalog.service.s3") public interface CatalogS3Config extends S3Config, S3Options { - @WithName("throttled-retry-after") - @WithDefault("PT10S") - @Override - Optional retryAfter(); @WithName("http.expect-continue-enabled") @Override diff --git a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java index b91c0171e8f..da83a1b57a1 100644 --- a/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java +++ b/servers/quarkus-common/src/main/java/org/projectnessie/quarkus/config/CatalogServiceConfig.java @@ -19,9 +19,34 @@ import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; import java.time.Duration; +import org.projectnessie.catalog.files.api.ObjectIOConfig; @ConfigMapping(prefix = "nessie.catalog.service") -public interface CatalogServiceConfig { +public interface CatalogServiceConfig extends ObjectIOConfig { + /** + * Interval after which a request is retried when storage I/O responds with some "retry later" + * response. + */ + @WithName("throttled-retry-after") // TODO: update helm helpers + @WithDefault("PT10S") + Duration retryAfterThrottled(); + + @WithName("network-error-retry-after") + @WithDefault("PT30S") + Duration retryAfterNetworkError(); + + @WithName("reattempt-after-fetch-error") + @WithDefault("PT60S") + Duration reattemptAfterFetchError(); + + /** + * The maximum time period to wait for tasks importing data from object stores (including + * retries). + */ + @WithName("imports.task-timeout") + @WithDefault("PT10M") + Duration importTaskTimeout(); + /** Advanced property, defines the maximum number of concurrent imports from object stores. */ @WithName("imports.max-concurrent") @WithDefault("32") diff --git a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java index db512c910fe..b1a1dadba38 100644 --- a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java +++ b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java @@ -27,6 +27,7 @@ import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterAll; @@ -97,6 +98,7 @@ void testCreateTableForbidden() { Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); // Attempts to create files blocked by the session IAM policy break the createTable() call assertThatThrownBy(() -> catalog.createTable(TableIdentifier.of(ns, "table1"), schema)) + .isInstanceOf(ForbiddenException.class) .hasMessageContaining("S3Exception: Access Denied") // make sure the error comes from the Catalog Server .hasStackTraceContaining("org.apache.iceberg.rest.RESTClient"); diff --git a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java index 7036f16fe65..80796985460 100644 --- a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java +++ b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/CatalogProducers.java @@ -43,24 +43,29 @@ import org.projectnessie.catalog.files.ResolvingObjectIO; import org.projectnessie.catalog.files.adls.AdlsClientSupplier; import org.projectnessie.catalog.files.adls.AdlsClients; +import org.projectnessie.catalog.files.adls.AdlsExceptionMapper; import org.projectnessie.catalog.files.adls.AdlsFileSystemOptions; import org.projectnessie.catalog.files.adls.AdlsOptions; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; import org.projectnessie.catalog.files.api.RequestSigner; import org.projectnessie.catalog.files.gcs.GcsBucketOptions; import org.projectnessie.catalog.files.gcs.GcsClients; +import org.projectnessie.catalog.files.gcs.GcsExceptionMapper; import org.projectnessie.catalog.files.gcs.GcsOptions; import org.projectnessie.catalog.files.gcs.GcsStorageSupplier; import org.projectnessie.catalog.files.s3.S3BucketOptions; import org.projectnessie.catalog.files.s3.S3ClientSupplier; import org.projectnessie.catalog.files.s3.S3Clients; import org.projectnessie.catalog.files.s3.S3CredentialsResolver; +import org.projectnessie.catalog.files.s3.S3ExceptionMapper; import org.projectnessie.catalog.files.s3.S3Options; import org.projectnessie.catalog.files.s3.S3Sessions; import org.projectnessie.catalog.files.s3.S3SessionsManager; import org.projectnessie.catalog.files.s3.S3Signer; import org.projectnessie.catalog.secrets.SecretsProvider; import org.projectnessie.catalog.service.config.CatalogConfig; +import org.projectnessie.catalog.service.impl.EntitySnapshotTaskBehavior; import org.projectnessie.client.api.NessieApiV2; import org.projectnessie.nessie.combined.CombinedClientBuilder; import org.projectnessie.nessie.tasks.async.TasksAsync; @@ -174,6 +179,7 @@ public HttpClient adlsHttpClient(CatalogAdlsConfig adlsConfig) { @Produces @Singleton public ObjectIO objectIO( + ObjectIOExceptionMapper exceptionMapper, CatalogS3Config s3config, @CatalogS3Client SdkHttpClient sdkClient, CatalogAdlsConfig adlsConfig, @@ -201,6 +207,27 @@ public RequestSigner signer( return new S3Signer(s3config, secretsProvider, s3sessions); } + @Produces + @Singleton + public EntitySnapshotTaskBehavior entitySnapshotTaskBehavior( + CatalogServiceConfig config, ObjectIOExceptionMapper mapper) { + return new EntitySnapshotTaskBehavior(mapper, config.importTaskTimeout()); + } + + @Produces + @Singleton + public ObjectIOExceptionMapper objectIOExceptionMapper(CatalogServiceConfig config) { + return ObjectIOExceptionMapper.builder() + .clock(systemUTC()) + .retryAfterThrottled(config.retryAfterThrottled()) + .retryAfterNetworkError(config.retryAfterNetworkError()) + .reattemptAfterFetchError(config.reattemptAfterFetchError()) + .addAnalyzer(AdlsExceptionMapper.INSTANCE) + .addAnalyzer(GcsExceptionMapper.INSTANCE) + .addAnalyzer(S3ExceptionMapper.INSTANCE) + .build(); + } + /** * Provides the {@link TasksAsync} instance backed by a thread-pool executor configured according * to {@link CatalogServiceConfig}, with thread-context propagation. diff --git a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java index 7de337312b5..04143be4323 100644 --- a/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java +++ b/servers/quarkus-server/src/main/java/org/projectnessie/server/catalog/MonitoredTaskServiceMetrics.java @@ -44,6 +44,9 @@ public void taskAttemptRunning() {} @Override public void taskAttemptErrorRetry() {} + @Override + public void taskAttemptRecover() {} + @Override public void taskCreation() {} diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java index 78a5f7f8022..3e5cc86f6c2 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java @@ -17,6 +17,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.InstanceOfAssertFactories.STRING; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -35,17 +36,26 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.IntStream; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTSerializers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.projectnessie.client.api.NessieApiV2; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.objectstoragemock.HeapStorageBucket; +import org.projectnessie.server.catalog.ObjectStorageMockTestResourceLifecycleManager.AccessCheckHandlerHolder; public abstract class AbstractIcebergCatalogUnitTests extends AbstractIcebergCatalogTests { HeapStorageBucket heapStorageBucket; + AccessCheckHandlerHolder accessCheckHandler; @BeforeEach public void clearBucket() { @@ -205,4 +215,54 @@ public void namespacesPaging() throws Exception { .mapToObj(i -> TableIdentifier.of("namespace_0", "view_" + i)) .toList()); } + + @Test + void testStorageReadFailure() throws Exception { + @SuppressWarnings("resource") + RESTCatalog catalog = catalog(); + + TableIdentifier id1 = TableIdentifier.of("ns", "table1"); + catalog.createNamespace(id1.namespace()); + Table table = catalog.buildTable(id1, SCHEMA).create(); + + try (NessieApiV2 api = nessieClientBuilder().build(NessieApiV2.class)) { + api.createNamespace().reference(api.getDefaultBranch()).namespace("test-ns").create(); + api.commitMultipleOperations() + .commitMeta(CommitMeta.fromMessage("test")) + .branch(api.getDefaultBranch()) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table2"), + IcebergTable.of(table.location() + "_test_access_denied_file", 1, 2, 3, 4))) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table3"), + IcebergTable.of(table.location() + "_test_non_existent_file", 1, 2, 3, 4))) + .commit(); + } + + accessCheckHandler.set(key -> !key.contains("_test_access_denied_")); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table2"))) + .isInstanceOf(ForbiddenException.class) + .hasMessageContaining("_test_access_denied_file"); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table3"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("_test_non_existent_file"); + } + + @Test + void testStorageWriteFailure() throws Exception { + @SuppressWarnings("resource") + RESTCatalog catalog = catalog(); + + TableIdentifier id1 = TableIdentifier.of("ns", "table_access_denied"); + catalog.createNamespace(id1.namespace()); + + accessCheckHandler.set(key -> !key.contains("table_access_denied")); + assertThatThrownBy(() -> catalog.buildTable(id1, SCHEMA).create()) + .isInstanceOf(ForbiddenException.class) + .hasMessageContaining("table_access_denied"); + } } diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java index be62f84fc27..43c3aaf6146 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java @@ -19,6 +19,7 @@ import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.projectnessie.objectstoragemock.AccessCheckHandler; import org.projectnessie.objectstoragemock.HeapStorageBucket; import org.projectnessie.objectstoragemock.ObjectStorageMock; import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; @@ -45,6 +46,7 @@ public static String bucketWarehouseLocation(String scheme) { "ObjectStorageMockTestResourceLifecycleManager.initAddress"; private final AssumeRoleHandlerHolder assumeRoleHandler = new AssumeRoleHandlerHolder(); + private final AccessCheckHandlerHolder accessCheckHandler = new AccessCheckHandlerHolder(); private HeapStorageBucket heapStorageBucket; private MockServer server; @@ -58,6 +60,7 @@ public Map start() { .initAddress("localhost") .putBuckets(BUCKET, heapStorageBucket.bucket()) .assumeRoleHandler(assumeRoleHandler) + .accessCheckHandler(accessCheckHandler) .build() .start(); @@ -92,6 +95,9 @@ public void inject(TestInjector testInjector) { testInjector.injectIntoFields( assumeRoleHandler, new TestInjector.MatchesType(AssumeRoleHandlerHolder.class)); + + testInjector.injectIntoFields( + accessCheckHandler, new TestInjector.MatchesType(AccessCheckHandlerHolder.class)); } @Override @@ -137,4 +143,18 @@ public AssumeRoleResult assumeRole( serialNumber); } } + + public static final class AccessCheckHandlerHolder implements AccessCheckHandler { + private final AtomicReference handler = new AtomicReference<>(); + + public void set(AccessCheckHandler handler) { + this.handler.set(handler); + } + + @Override + public boolean accessAllowed(String objectKey) { + AccessCheckHandler delegate = handler.get(); + return delegate == null || delegate.accessAllowed(objectKey); + } + } } diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java index 076d7318b41..18b7936468b 100644 --- a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskBehavior.java @@ -15,8 +15,6 @@ */ package org.projectnessie.nessie.tasks.api; -import static org.projectnessie.nessie.tasks.api.TaskState.failureState; - import java.time.Clock; import java.time.Instant; import org.projectnessie.versioned.storage.common.persist.ObjType; @@ -52,14 +50,14 @@ public interface TaskBehavior { */ TaskState runningTaskState(Clock clock, T running); + TaskState timedOutTaskState(Clock clock, T base, Throwable t); + /** * Called when the task execution resulted in an exception, Build a new {@linkplain * TaskStatus#ERROR_RETRY error-retry}, with "fresh" {@linkplain TaskState#retryNotBefore() * retry-not-before} timestamp, or {@linkplain TaskStatus#FAILURE failure} task-state. */ - default TaskState asErrorTaskState(Clock clock, T base, Throwable t) { - return failureState(t.toString()); - } + TaskState asErrorTaskState(Clock clock, T base, Throwable t); /** Create a new, empty task-object builder. */ B newObjBuilder(); diff --git a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java index e97fc122659..94cb2b81f1d 100644 --- a/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java +++ b/tasks/api/src/main/java/org/projectnessie/nessie/tasks/api/TaskState.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.common.base.Supplier; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.time.Instant; @@ -69,27 +70,52 @@ public interface TaskState { @JsonInclude(JsonInclude.Include.NON_NULL) String message(); - TaskState SUCCESS = ImmutableTaskState.of(TaskStatus.SUCCESS, null, null, null); + /** + * Represents the deadline when a task will fail even if it gets a re-triable error. Note: passing + * the deadline may or may not forcibly cancel a running task. + */ + @Value.Parameter(order = 5) + @Nullable + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonDeserialize(using = InstantAsLongDeserializer.class) + @JsonSerialize(using = InstantAsLongSerializer.class) + Instant deadline(); + + TaskState SUCCESS = ImmutableTaskState.builder().status(TaskStatus.SUCCESS).build(); static TaskState successState() { return SUCCESS; } - static TaskState runningState(@Nonnull Instant retryNotBefore, @Nonnull Instant lostNotBefore) { - return ImmutableTaskState.of(TaskStatus.RUNNING, retryNotBefore, lostNotBefore, null); + private static Instant deadlineFor(@Nullable TaskState old, Supplier deadline) { + if (old == null || old.deadline() == null) { + return deadline.get(); + } + + return old.deadline(); } - static TaskState retryableErrorState(@Nonnull Instant retryNotBefore, @Nonnull String message) { - return ImmutableTaskState.of(TaskStatus.ERROR_RETRY, retryNotBefore, null, message); + static TaskState runningState( + @Nonnull Instant retryNotBefore, + @Nonnull Instant lostNotBefore, + @Nullable TaskState previous, + Supplier deadline) { + return ImmutableTaskState.of( + TaskStatus.RUNNING, retryNotBefore, lostNotBefore, null, deadlineFor(previous, deadline)); } - static TaskState failureState(@Nonnull String message) { - return ImmutableTaskState.of(TaskStatus.FAILURE, null, null, message); + static TaskState retryableErrorState( + @Nonnull Instant retryNotBefore, + @Nonnull String message, + @Nullable TaskState previous, + Supplier deadline) { + return ImmutableTaskState.of( + TaskStatus.ERROR_RETRY, retryNotBefore, null, message, deadlineFor(previous, deadline)); } - static TaskState taskState( - TaskStatus taskStatus, Instant retryNotBefore, Instant lostNotBefore, String message) { - return ImmutableTaskState.of(taskStatus, retryNotBefore, lostNotBefore, message); + static TaskState failureState( + @Nullable Instant attemptRecoverNotBefore, @Nonnull String message) { + return ImmutableTaskState.of(TaskStatus.FAILURE, attemptRecoverNotBefore, null, message, null); } @Value.Check @@ -98,19 +124,22 @@ default void check() { case SUCCESS: checkState(retryNotBefore() == null, "retryNotBefore must be null for SUCCESS"); checkState(lostNotBefore() == null, "retryNotBefore must be null for SUCCESS"); + checkState(deadline() == null, "deadline must be null for SUCCESS"); break; case FAILURE: - checkState(retryNotBefore() == null, "retryNotBefore must be null for FAILURE"); checkState(lostNotBefore() == null, "lostNotBefore must be null for FAILURE"); + checkState(deadline() == null, "deadline must be null for FAILURE"); checkState(message() != null, "message must not be null for FAILURE"); break; case RUNNING: checkState(retryNotBefore() != null, "retryNotBefore must not be null for RUNNING"); checkState(lostNotBefore() != null, "lostNotBefore must not be null for RUNNING"); + checkState(deadline() != null, "deadline must not be null for RUNNING"); break; case ERROR_RETRY: checkState(retryNotBefore() != null, "retryNotBefore must not be null for ERROR_RETRY"); checkState(lostNotBefore() == null, "lostNotBefore must be null for ERROR_RETRY"); + checkState(deadline() != null, "deadline must not be null for ERROR_RETRY"); checkState(message() != null, "message must not be null for ERROR_RETRY"); break; default: diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java index 2803fdb12eb..d6d70e2941d 100644 --- a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TaskServiceMetrics.java @@ -40,6 +40,8 @@ public interface TaskServiceMetrics { /** Task attempt detected that task ran into a retryable error. */ void taskAttemptErrorRetry(); + void taskAttemptRecover(); + /** New task object is being created in the database. */ void taskCreation(); diff --git a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java index 7496b34425d..c5a512a1dbc 100644 --- a/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java +++ b/tasks/service/impl/src/main/java/org/projectnessie/nessie/tasks/service/impl/TasksServiceImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.immutables.value.Value; @@ -114,11 +115,16 @@ CompletionStage submit( Obj obj = persist.getImmediate(taskRequest.objId()); if (obj != null) { T taskObj = castObj(taskRequest, obj); - TaskStatus status = taskObj.taskState().status(); + TaskState taskState = taskObj.taskState(); + TaskStatus status = taskState.status(); switch (status) { case FAILURE: - metrics.taskHasFinalFailure(); - return failedStage(taskRequest.behavior().stateAsException(taskObj)); + if (canRetryNow(taskState)) { + break; + } else { + metrics.taskHasFinalFailure(); + return failedStage(taskRequest.behavior().stateAsException(taskObj)); + } case SUCCESS: metrics.taskHasFinalSuccess(); return completedStage(taskObj); @@ -155,6 +161,20 @@ CompletionStage submit( return r; } + private boolean canRetryNow(TaskState state) { + Instant now = async.clock().instant(); + return now.compareTo(requireNonNull(state.retryNotBefore())) >= 0; + } + + private boolean expired(TaskState state) { + Instant deadline = state.deadline(); + if (deadline == null) { + return false; + } + Instant now = async.clock().instant(); + return now.compareTo(deadline) >= 0; + } + private void finalResult(ExecParams params, TaskObj result) { try { params.resultFuture.complete(result); @@ -196,8 +216,13 @@ private void tryLocal(ExecParams params) { finalResult(params, obj); break; case FAILURE: - metrics.taskAttemptFinalFailure(); - finalFailure(params, params.taskRequest.behavior().stateAsException(obj)); + if (canRetryNow(state)) { + metrics.taskAttemptRecover(); + maybeAttemptErrorRetry(params, obj, true); + } else { + metrics.taskAttemptFinalFailure(); + finalFailure(params, params.taskRequest.behavior().stateAsException(obj)); + } break; case RUNNING: metrics.taskAttemptRunning(); @@ -205,7 +230,7 @@ private void tryLocal(ExecParams params) { break; case ERROR_RETRY: metrics.taskAttemptErrorRetry(); - maybeAttemptErrorRetry(params, state, obj); + maybeAttemptErrorRetry(params, obj, false); break; default: throw new IllegalStateException("Unknown task status " + state.status()); @@ -285,17 +310,18 @@ private void checkRunningTask(ExecParams params, TaskState state, TaskObj obj) } // Called while ExecParams is locked from tryLocal() - private void maybeAttemptErrorRetry(ExecParams params, TaskState state, TaskObj obj) + private void maybeAttemptErrorRetry(ExecParams params, TaskObj obj, boolean recoverFromFailure) throws ObjTooLargeException { - Instant now = async.clock().instant(); - if (now.compareTo(requireNonNull(state.retryNotBefore())) >= 0) { + if (recoverFromFailure || canRetryNow(obj.taskState())) { TaskBehavior behavior = params.taskRequest.behavior(); TaskObj retryState = withNewVersionToken( behavior .newObjBuilder() .from(obj) - .taskState(behavior.runningTaskState(async.clock(), obj))); + .taskState( + // Use null TaskObj to reset deadlines when recovering from failures. + behavior.runningTaskState(async.clock(), recoverFromFailure ? null : obj))); if (params.persist.updateConditional(obj, retryState)) { metrics.taskRetryStateChangeSucceeded(); @@ -305,7 +331,7 @@ private void maybeAttemptErrorRetry(ExecParams params, TaskState state, TaskObj reattemptAfterRace(params); } } else { - async.schedule(() -> tryLocal(params), state.retryNotBefore()); + async.schedule(() -> tryLocal(params), obj.taskState().retryNotBefore()); } } @@ -379,7 +405,15 @@ private void localTaskFinished( LOGGER.trace("{}: Task execution for {} failed, updating database", name, params); TaskBehavior behavior = params.taskRequest.behavior(); - TaskState newState = behavior.asErrorTaskState(async.clock(), expected, failure); + TaskState newState; + if (expired(expected.taskState())) { + Exception te = new TimeoutException("Deadline exceeded after: " + failure); + te.addSuppressed(failure); + failure = te; + newState = behavior.timedOutTaskState(async.clock(), expected, failure); + } else { + newState = behavior.asErrorTaskState(async.clock(), expected, failure); + } checkState(newState.status().isError()); TaskObj updatedObj = withNewVersionToken(behavior.newObjBuilder().from(expected).taskState(newState)); diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java index d1bdad89c68..98d5f304560 100644 --- a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/impl/TestTasksServiceImpl.java @@ -26,7 +26,9 @@ import static org.projectnessie.nessie.tasks.service.TasksServiceConfig.DEFAULT_RACE_WAIT_MILLIS_MIN; import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.FRESH_LOST_RETRY_NOT_BEFORE; import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.FRESH_RUNNING_RETRY_NOT_BEFORE; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.RECOVER_NOT_BEFORE; import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.RETRYABLE_ERROR_NOT_BEFORE; +import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskBehavior.RETRYABLE_ERROR_TOTAL_TIMEOUT; import static org.projectnessie.nessie.tasks.service.tasktypes.BasicTaskRequest.basicTaskRequest; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_REPOSITORY_ID; @@ -774,6 +776,218 @@ public void singleServiceSingleConsumerRetryableError() throws Exception { verifyNoMoreInteractions(metrics); } + @Test + public void singleServiceStopPerpetualRetry() throws Exception { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + AtomicReference> taskCompletionStage = + new AtomicReference<>(new CompletableFuture<>()); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", taskCompletionStage::get); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskCompletionStage.get().completeExceptionally(new RetryableException("retryable")); + + clock.add(500, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionRetryableError(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + Instant firstError = clock.instant(); + + clock.add(RETRYABLE_ERROR_TOTAL_TIMEOUT.dividedBy(4)); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionRetryableError(); + reset(metrics); + + clock.add(RETRYABLE_ERROR_TOTAL_TIMEOUT.dividedBy(4)); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionRetryableError(); + reset(metrics); + + clock.add(RETRYABLE_ERROR_TOTAL_TIMEOUT.dividedBy(4)); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isNotDone(); + verify(metrics).taskAttempt(); + verify(metrics).taskExecutionRetryableError(); + verify(metrics).taskExecutionFinished(); + reset(metrics); + + clock.set(firstError.plus(RETRYABLE_ERROR_TOTAL_TIMEOUT)); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isDone(); + soft.assertThat(taskFuture).isCompletedExceptionally(); + verify(metrics).taskAttempt(); + verify(metrics).taskRetryStateChangeSucceeded(); + verify(metrics).taskAttemptErrorRetry(); + verify(metrics).taskExecution(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // need a new CompletableFuture that can be used when the task's execution is re-triggered + taskCompletionStage.set(new CompletableFuture<>()); + // Re-submit to recover + clock.add(RECOVER_NOT_BEFORE); + taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + soft.assertThat(taskFuture).isNotDone(); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskAttempt(); + verify(metrics).startNewTaskController(); + verify(metrics).taskAttemptRecover(); + verify(metrics).taskExecution(); + verify(metrics).taskRetryStateChangeSucceeded(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskCompletionStage + .get() + .complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture).isCompleted(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResult(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + soft.assertThat(taskFuture.get()) + .asInstanceOf(type(BasicTaskObj.class)) + .extracting(BasicTaskObj::taskResult) + .isEqualTo("hello finished"); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + + @Test + public void singleServiceRecoverFromError() throws Exception { + MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); + TestingTasksAsync async = new TestingTasksAsync(clock); + + AtomicReference> taskCompletionStage = + new AtomicReference<>(new CompletableFuture<>()); + + TaskServiceMetrics metrics = mock(TaskServiceMetrics.class); + TasksServiceImpl service = new TasksServiceImpl(async, metrics, tasksServiceConfig(1)); + Tasks tasks = service.forPersist(persist); + BasicTaskRequest taskRequest = basicTaskRequest("hello", taskCompletionStage::get); + + CompletableFuture taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + verify(metrics).startNewTaskController(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(500, ChronoUnit.MILLIS); + taskCompletionStage.get().completeExceptionally(new Exception("test auth failure")); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isDone(); + soft.assertThat(taskFuture).isCompletedExceptionally(); + verify(metrics).taskAttempt(); + verify(metrics).taskCreation(); + verify(metrics).taskExecution(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + soft.assertThat(async.doWork()).isEqualTo(1); + soft.assertThat(taskFuture).isDone(); + soft.assertThat(taskFuture).isCompletedExceptionally(); + verify(metrics).startNewTaskController(); + verify(metrics).taskAttempt(); + verify(metrics).taskAttemptFinalFailure(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + // need a new CompletableFuture that can be used when the task's execution is re-triggered + taskCompletionStage.set(new CompletableFuture<>()); + + clock.add(RECOVER_NOT_BEFORE); + + taskFuture = tasks.submit(taskRequest).toCompletableFuture(); + soft.assertThat(taskFuture).isNotDone(); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskAttempt(); + verify(metrics).startNewTaskController(); + verify(metrics).taskAttemptRecover(); + verify(metrics).taskExecution(); + verify(metrics).taskRetryStateChangeSucceeded(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(250, ChronoUnit.MILLIS); + soft.assertThat(async.doWork()).isEqualTo(1); + verify(metrics).taskUpdateRunningState(); + verify(metrics).taskRunningStateUpdated(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + taskCompletionStage + .get() + .complete( + BasicTaskObj.builder() + .id(taskRequest.objId()) + .taskParameter(taskRequest.taskParameter()) + .taskResult(taskRequest.taskParameter() + " finished") + .taskState(TaskState.successState())); + soft.assertThat(taskFuture).isCompleted(); + verify(metrics).taskExecutionFinished(); + verify(metrics).taskExecutionResult(); + verifyNoMoreInteractions(metrics); + reset(metrics); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + + soft.assertThat(taskFuture.get()) + .asInstanceOf(type(BasicTaskObj.class)) + .extracting(BasicTaskObj::taskResult) + .isEqualTo("hello finished"); + + clock.add(1, ChronoUnit.SECONDS); + soft.assertThat(async.doWork()).isEqualTo(0); + verifyNoMoreInteractions(metrics); + } + @Test public void twoServicesDistributedTaskFailure() { MutableClock clock = MutableClock.of(Instant.now(), ZoneId.of("UTC")); diff --git a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java index 5723bc64f3a..982a06a5897 100644 --- a/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java +++ b/tasks/service/impl/src/test/java/org/projectnessie/nessie/tasks/service/tasktypes/BasicTaskBehavior.java @@ -24,7 +24,9 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; +import javax.annotation.Nullable; import org.projectnessie.nessie.tasks.api.TaskBehavior; +import org.projectnessie.nessie.tasks.api.TaskObj; import org.projectnessie.nessie.tasks.api.TaskState; import org.projectnessie.versioned.storage.common.persist.ObjType; @@ -36,6 +38,8 @@ public class BasicTaskBehavior implements TaskBehavior deadline(clock)); } - return failureState(t.toString()); + return failureState(recoverNotBefore(clock), t.toString()); } @Override @@ -67,7 +81,16 @@ public BasicTaskObj.Builder newObjBuilder() { @Override public TaskState runningTaskState(Clock clock, BasicTaskObj running) { - return runningState(freshRunningRetryNotBefore(clock), freshLostRetryNotBefore(clock)); + return runningState( + freshRunningRetryNotBefore(clock), + freshLostRetryNotBefore(clock), + taskState(running), + () -> deadline(clock)); + } + + @Override + public TaskState timedOutTaskState(Clock clock, BasicTaskObj base, Throwable t) { + return failureState(clock.instant(), t.toString()); } @Override @@ -86,4 +109,8 @@ private Instant freshLostRetryNotBefore(Clock clock) { private Instant retryableErrorNotBefore(Clock clock) { return clock.instant().plus(RETRYABLE_ERROR_NOT_BEFORE); } + + private Instant recoverNotBefore(Clock clock) { + return clock.instant().plus(RECOVER_NOT_BEFORE); + } } diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java new file mode 100644 index 00000000000..78f85527216 --- /dev/null +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.objectstoragemock; + +public interface AccessCheckHandler { + boolean accessAllowed(String objectKey); +} diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java index b59f1e921b0..1592c3a5aab 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java @@ -84,6 +84,7 @@ public Response create( return withFilesystem( filesystem, + normalizedPath, b -> { Bucket.ObjectUpdater updater = b.updater().update(normalizedPath, Bucket.UpdaterMode.CREATE_NEW); @@ -116,6 +117,7 @@ public Response update( return withFilesystem( filesystem, + normalizedPath, b -> { if (!action.appendOrFlush()) { return notImplemented(); @@ -151,6 +153,7 @@ public Response read( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -194,6 +197,7 @@ public Response getProperties( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -225,6 +229,7 @@ public Response delete( return withFilesystem( filesystem, + normalizedPath, b -> { if (recursive) { try (Stream listStream = @@ -262,6 +267,7 @@ public Response list( return withFilesystem( filesystem, + normalizedPath, b -> { try (Stream listStream = b.lister().list(normalizedPath, continuationToken)) { @@ -349,6 +355,10 @@ private static Response keyNotFound() { Status.NOT_FOUND, "PathNotFound", "The specified path does not exist."); } + private static Response accessDenied() { + return dataLakeStorageError(Status.FORBIDDEN, "Forbidden", "Access Denied."); + } + private static Response dataLakeStorageError(Status status, String code, String message) { return Response.status(status) .header("x-ms-error-code", code) @@ -361,7 +371,12 @@ private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } - private Response withFilesystem(String filesystem, Function worker) { + private Response withFilesystem( + String filesystem, String path, Function worker) { + if (!mockServer.accessCheckHandler().accessAllowed(path)) { + return accessDenied(); + } + Bucket bucket = mockServer.buckets().get(filesystem); if (bucket == null) { return bucketNotFound(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java index 5ac68c901a9..0dc10735edd 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java @@ -174,6 +174,7 @@ public Response deleteObject( @PathParam("bucketName") String bucketName, @PathParam("object") String objectName) { return withBucket( bucketName, + objectName, b -> { if (!b.deleter().delete(objectName)) { return keyNotFound(); @@ -277,6 +278,7 @@ public Response insertObject( InputStream stream) { return withBucket( bucketName, + objectName, bucket -> { try { Bucket.Updater updater = bucket.updater(); @@ -380,6 +382,10 @@ private static Response keyNotFound() { return errorResponse(Status.NOT_FOUND, "The specified key does not exist."); } + private static Response accessDenied() { + return errorResponse(Status.FORBIDDEN, "Access Denied."); + } + private static Response errorResponse(Status status, String message) { return Response.status(status) .type(MediaType.APPLICATION_JSON) @@ -400,10 +406,25 @@ private Response withBucket(String bucketName, Function worker return worker.apply(bucket); } + private Response withBucket( + String bucketName, String objectName, Function worker) { + Bucket bucket = mockServer.buckets().get(bucketName); + if (bucket == null) { + return bucketNotFound(); + } + + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + + return worker.apply(bucket); + } + private Response withBucketObject( String bucketName, String objectName, Function worker) { return withBucket( bucketName, + objectName, bucket -> { MockObject o = bucket.object().retrieve(objectName); if (o == null) { diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java index b40491a53c6..36a8730c637 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java @@ -91,6 +91,11 @@ public AssumeRoleHandler assumeRoleHandler() { .build()); } + @Value.Default + public AccessCheckHandler accessCheckHandler() { + return (key) -> true; + } + public interface MockServer extends AutoCloseable { URI getS3BaseUri(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java index ff0f3eeb4fe..7e7b87f5fef 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java @@ -266,6 +266,10 @@ public Response batchDeleteObjects( b -> { ImmutableBatchDeleteResponse.Builder response = ImmutableBatchDeleteResponse.builder(); for (S3ObjectIdentifier s3ObjectIdentifier : body.objectsToDelete()) { + if (!mockServer.accessCheckHandler().accessAllowed(s3ObjectIdentifier.key())) { + return accessDenied(); + } + if (b.deleter().delete(s3ObjectIdentifier.key())) { response.addDeletedObjects( ImmutableDeletedS3Object.builder() @@ -308,6 +312,7 @@ public Response deleteObject( @PathParam("bucketName") String bucketName, @PathParam("object") String objectName) { return withBucket( bucketName, + objectName, b -> { b.deleter().delete(objectName); return noContent(); @@ -378,6 +383,7 @@ public Response putObject( ) { return withBucket( bucketName, + objectName, bucket -> { try { @@ -461,6 +467,13 @@ private static Response keyNotFound() { .build(); } + private static Response accessDenied() { + return Response.status(Status.FORBIDDEN) + .type(MediaType.APPLICATION_XML_TYPE) + .entity(ErrorResponse.of("AccessDenied", "Access Denied.")) + .build(); + } + private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } @@ -473,10 +486,25 @@ private Response withBucket(String bucketName, Function worker return worker.apply(bucket); } + private Response withBucket( + String bucketName, String objectName, Function worker) { + Bucket bucket = mockServer.buckets().get(bucketName); + if (bucket == null) { + return bucketNotFound(); + } + + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + + return worker.apply(bucket); + } + private Response withBucketObject( String bucketName, String objectName, Function worker) { return withBucket( bucketName, + objectName, bucket -> { MockObject o = bucket.object().retrieve(objectName); if (o == null) { diff --git a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITDeleteCatalogTasks.java b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITDeleteCatalogTasks.java index 8989f6cdc0d..70b06983f5d 100644 --- a/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITDeleteCatalogTasks.java +++ b/tools/server-admin/src/intTest/java/org/projectnessie/tools/admin/cli/ITDeleteCatalogTasks.java @@ -48,7 +48,7 @@ class ITDeleteCatalogTasks extends AbstractContentTests { } private ObjId storeNewEntry() { - return storeNewEntry(TaskState.failureState("test")); + return storeNewEntry(TaskState.failureState(null, "test")); } private ObjId storeNewEntry(TaskState state) { @@ -58,7 +58,7 @@ private ObjId storeNewEntry(TaskState state) { } private ObjId storeNewEntry(ContentKey key, Content content) { - return storeNewEntry(TaskState.failureState("test"), key, content); + return storeNewEntry(TaskState.failureState(null, "test"), key, content); } private ObjId storeNewEntry(TaskState state, ContentKey key, Content content) { @@ -126,8 +126,10 @@ public void testExpireByKey(QuarkusMainLauncher launcher, Persist persist) public void testExpireByStatus(QuarkusMainLauncher launcher, Persist persist) throws ObjNotFoundException { ObjId id1 = storeNewEntry(TaskState.SUCCESS); - ObjId id2 = storeNewEntry(TaskState.failureState("test2")); - ObjId id3 = storeNewEntry(TaskState.retryableErrorState(Instant.ofEpochMilli(0), "test3")); + ObjId id2 = storeNewEntry(TaskState.failureState(null, "test2")); + ObjId id3 = + storeNewEntry( + TaskState.retryableErrorState(Instant.EPOCH, "test3", null, () -> Instant.EPOCH)); launchNoFile( launcher, "delete-catalog-tasks", "--task-status=SUCCESS", "--task-status=FAILURE");