From c37e4ee1b7b66cfea3faa969159d61b4d9f65ecf Mon Sep 17 00:00:00 2001 From: Dmitri Bourlatchkov Date: Sun, 9 Jun 2024 21:15:21 -0400 Subject: [PATCH] Add exception mappers to convert storage failures to Iceberg REST client exceptions * Read path for ObjectIO. * Add AccessCheckHandler to ObjectStorageMock for simulating access failures in tests. * Add exception mappers to convert storage failures to Iceberg REST client exceptions. Closes #8738 --- .../files/api/BackendThrottledException.java | 20 +++--- .../files/api/NonRetryableException.java | 13 +--- .../catalog/files/api/ObjectIOException.java | 35 ++++++++-- .../files/api/ObjectIOExceptionMapper.java | 62 ++++++++++++++++++ .../files/api/ObjectIOInputStream.java | 57 ++++++++++++++++ .../files/api/ObjectIOOutputStream.java | 58 +++++++++++++++++ .../catalog/files/api/RetryableException.java | 21 +++--- .../files/adls/AdlsExceptionMapper.java | 37 +++++++++++ .../catalog/files/adls/AdlsObjectIO.java | 19 ++++-- .../catalog/files/gcs/GcsExceptionMapper.java | 40 ++++++++++++ .../catalog/files/gcs/GcsObjectIO.java | 9 ++- .../catalog/files/s3/S3ExceptionMapper.java | 49 ++++++++++++++ .../catalog/files/s3/S3ObjectIO.java | 65 ++++++++++++------- .../service/impl/ImportSnapshotWorker.java | 13 ++-- .../service/rest/IcebergErrorMapper.java | 31 +++++++++ .../s3/ITS3AssumeRoleIcebergCatalog.java | 2 + .../AbstractIcebergCatalogUnitTests.java | 45 +++++++++++++ ...orageMockTestResourceLifecycleManager.java | 20 ++++++ .../objectstoragemock/AccessCheckHandler.java | 20 ++++++ .../objectstoragemock/AdlsGen2Resource.java | 17 ++++- .../objectstoragemock/GcsResource.java | 8 +++ .../objectstoragemock/ObjectStorageMock.java | 5 ++ .../objectstoragemock/S3Resource.java | 11 ++++ 23 files changed, 586 insertions(+), 71 deletions(-) create mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java create mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOInputStream.java create mode 100644 catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOOutputStream.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java create mode 100644 catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java create mode 100644 testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java index 09862bb27b2..46af8eca77c 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/BackendThrottledException.java @@ -24,18 +24,22 @@ import java.time.format.DateTimeParseException; import java.util.Optional; import java.util.function.Function; +import org.projectnessie.storage.uri.StorageUri; public class BackendThrottledException extends RetryableException { - public BackendThrottledException(Instant retryNotBefore, Throwable cause) { - super(retryNotBefore, cause); + public BackendThrottledException( + Instant retryNotBefore, String message, StorageUri uri, IOMode ioMode, int httpStatusCode) { + super(retryNotBefore, message, uri, ioMode, httpStatusCode); } - public BackendThrottledException(Instant retryNotBefore, String message) { - super(retryNotBefore, message); - } - - public BackendThrottledException(Instant retryNotBefore, String message, Throwable cause) { - super(retryNotBefore, message, cause); + public BackendThrottledException( + Instant retryNotBefore, + String message, + StorageUri uri, + IOMode ioMode, + int httpStatusCode, + Throwable cause) { + super(retryNotBefore, message, uri, ioMode, httpStatusCode, cause); } /** diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java index abd38e27cec..e2a43357bb2 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/NonRetryableException.java @@ -17,18 +17,11 @@ import java.time.Instant; import java.util.Optional; +import org.projectnessie.storage.uri.StorageUri; public class NonRetryableException extends ObjectIOException { - public NonRetryableException(Throwable cause) { - super(cause); - } - - public NonRetryableException(String message) { - super(message); - } - - public NonRetryableException(String message, Throwable cause) { - super(message, cause); + public NonRetryableException(StorageUri uri, IOMode ioMode, int httpStatusCode, Throwable cause) { + super(cause.getMessage(), uri, ioMode, httpStatusCode, cause); } @Override diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java index 45e0d58c30d..368c24a5c5b 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOException.java @@ -18,21 +18,44 @@ import java.io.IOException; import java.time.Instant; import java.util.Optional; +import org.projectnessie.storage.uri.StorageUri; public abstract class ObjectIOException extends IOException { - public ObjectIOException(Throwable cause) { - super(cause); - } + private final StorageUri uri; + private final IOMode ioMode; + private final int httpStatusCode; - public ObjectIOException(String message) { - super(message); + public ObjectIOException(String message, StorageUri uri, IOMode ioMode, int httpStatusCode) { + this(message, uri, ioMode, httpStatusCode, null); } - public ObjectIOException(String message, Throwable cause) { + public ObjectIOException( + String message, StorageUri uri, IOMode ioMode, int httpStatusCode, Throwable cause) { super(message, cause); + this.uri = uri; + this.ioMode = ioMode; + this.httpStatusCode = httpStatusCode; } public abstract boolean isRetryable(); public abstract Optional retryNotBefore(); + + public Optional uri() { + return Optional.ofNullable(uri); + } + + public IOMode ioMode() { + return ioMode; + } + + public int httpStatusCode() { + return httpStatusCode; + } + + public enum IOMode { + ACCESS, + READ, + WRITE, + } } diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java new file mode 100644 index 00000000000..dec5dc54ce0 --- /dev/null +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOExceptionMapper.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.api; + +import java.time.Instant; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOException.IOMode; +import org.projectnessie.storage.uri.StorageUri; + +public abstract class ObjectIOExceptionMapper { + + private final StorageUri uri; + private final IOMode ioMode; + + protected ObjectIOExceptionMapper(StorageUri uri, IOMode ioMode) { + this.uri = uri; + this.ioMode = ioMode; + } + + public ObjectIOException toStorageFailure(Throwable th) { + while (th != null) { + if (th instanceof ObjectIOException) { + return (ObjectIOException) th; + } + + ObjectIOException mapped = maybeMap(th); + if (mapped != null) { + return mapped; + } + + th = th.getCause(); + } + + return new NonRetryableException(uri, ioMode, 500, th); + } + + @Nullable + protected abstract ObjectIOException maybeMap(Throwable th); + + protected NonRetryableException asNonRetryableException(int httpStatusCode, Throwable cause) { + return new NonRetryableException(uri, ioMode, httpStatusCode, cause); + } + + protected BackendThrottledException asThrottledException( + Instant retryNotBefore, String message, int httpStatusCode, Throwable cause) { + return new BackendThrottledException( + retryNotBefore, message, uri, ioMode, httpStatusCode, cause); + } +} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOInputStream.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOInputStream.java new file mode 100644 index 00000000000..6311e87ffbe --- /dev/null +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOInputStream.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.api; + +import java.io.IOException; +import java.io.InputStream; + +public class ObjectIOInputStream extends InputStream { + + private final InputStream in; + private final ObjectIOExceptionMapper mapper; + + public ObjectIOInputStream(InputStream in, ObjectIOExceptionMapper mapper) { + this.in = in; + this.mapper = mapper; + } + + @Override + public int read() throws IOException { + try { + return in.read(); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + return in.read(b, off, len); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } + + @Override + public void close() throws IOException { + try { + in.close(); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } +} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOOutputStream.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOOutputStream.java new file mode 100644 index 00000000000..f037f09872e --- /dev/null +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/ObjectIOOutputStream.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.api; + +import java.io.IOException; +import java.io.OutputStream; +import javax.annotation.Nonnull; + +public class ObjectIOOutputStream extends OutputStream { + + private final OutputStream out; + private final ObjectIOExceptionMapper mapper; + + public ObjectIOOutputStream(OutputStream out, ObjectIOExceptionMapper mapper) { + this.out = out; + this.mapper = mapper; + } + + @Override + public void write(int b) throws IOException { + try { + out.write(b); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } + + @Override + public void write(@Nonnull byte[] b, int off, int len) throws IOException { + try { + out.write(b, off, len); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } + + @Override + public void close() throws IOException { + try { + out.close(); + } catch (IOException e) { + throw mapper.toStorageFailure(e); + } + } +} diff --git a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java index 4d8e4a693b0..b57b46dfa64 100644 --- a/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java +++ b/catalog/files/api/src/main/java/org/projectnessie/catalog/files/api/RetryableException.java @@ -17,22 +17,25 @@ import java.time.Instant; import java.util.Optional; +import org.projectnessie.storage.uri.StorageUri; public class RetryableException extends ObjectIOException { private final Instant retryNotBefore; - public RetryableException(Instant retryNotBefore, Throwable cause) { - super(cause); + public RetryableException( + Instant retryNotBefore, String message, StorageUri uri, IOMode ioMode, int httpStatusCode) { + super(message, uri, ioMode, httpStatusCode); this.retryNotBefore = retryNotBefore; } - public RetryableException(Instant retryNotBefore, String message) { - super(message); - this.retryNotBefore = retryNotBefore; - } - - public RetryableException(Instant retryNotBefore, String message, Throwable cause) { - super(message, cause); + public RetryableException( + Instant retryNotBefore, + String message, + StorageUri uri, + IOMode ioMode, + int httpStatusCode, + Throwable cause) { + super(message, uri, ioMode, httpStatusCode, cause); this.retryNotBefore = retryNotBefore; } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java new file mode 100644 index 00000000000..904d4776f2f --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsExceptionMapper.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.adls; + +import com.azure.storage.blob.models.BlobStorageException; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOException; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import org.projectnessie.storage.uri.StorageUri; + +public class AdlsExceptionMapper extends ObjectIOExceptionMapper { + protected AdlsExceptionMapper(StorageUri uri, ObjectIOException.IOMode ioMode) { + super(uri, ioMode); + } + + @Nullable + @Override + protected ObjectIOException maybeMap(Throwable th) { + if (th instanceof BlobStorageException) { + return asNonRetryableException(((BlobStorageException) th).getStatusCode(), th); + } + return null; + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java index b1a9fc665e2..0e27f6de59c 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/adls/AdlsObjectIO.java @@ -20,9 +20,13 @@ import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions; import com.azure.storage.file.datalake.options.DataLakeFileOutputStreamOptions; import java.io.BufferedOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOException.IOMode; +import org.projectnessie.catalog.files.api.ObjectIOInputStream; +import org.projectnessie.catalog.files.api.ObjectIOOutputStream; import org.projectnessie.storage.uri.StorageUri; public class AdlsObjectIO implements ObjectIO { @@ -34,20 +38,27 @@ public AdlsObjectIO(AdlsClientSupplier clientSupplier) { } @Override - public InputStream readObject(StorageUri uri) { + public InputStream readObject(StorageUri uri) throws IOException { DataLakeFileClient file = clientSupplier.fileClientForLocation(uri); DataLakeFileInputStreamOptions options = new DataLakeFileInputStreamOptions(); clientSupplier.adlsOptions().readBlockSize().ifPresent(options::setBlockSize); - return file.openInputStream(options).getInputStream(); + AdlsExceptionMapper mapper = new AdlsExceptionMapper(uri, IOMode.READ); + try { + return new ObjectIOInputStream(file.openInputStream(options).getInputStream(), mapper); + } catch (Exception e) { + throw mapper.toStorageFailure(e); + } } @Override - public OutputStream writeObject(StorageUri uri) { + public OutputStream writeObject(StorageUri uri) throws IOException { DataLakeFileClient file = clientSupplier.fileClientForLocation(uri); DataLakeFileOutputStreamOptions options = new DataLakeFileOutputStreamOptions(); ParallelTransferOptions transferOptions = new ParallelTransferOptions(); clientSupplier.adlsOptions().writeBlockSize().ifPresent(transferOptions::setBlockSizeLong); options.setParallelTransferOptions(transferOptions); - return new BufferedOutputStream(file.getOutputStream(options)); + AdlsExceptionMapper mapper = new AdlsExceptionMapper(uri, IOMode.WRITE); + return new ObjectIOOutputStream( + new BufferedOutputStream(file.getOutputStream(options)), mapper); } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java new file mode 100644 index 00000000000..82ae529f057 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsExceptionMapper.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.gcs; + +import com.google.cloud.storage.StorageException; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOException; +import org.projectnessie.catalog.files.api.ObjectIOException.IOMode; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import org.projectnessie.storage.uri.StorageUri; + +/** Extracts storage-side HTTP status code from GCS client exceptions. */ +public class GcsExceptionMapper extends ObjectIOExceptionMapper { + + protected GcsExceptionMapper(StorageUri uri, IOMode ioMode) { + super(uri, ioMode); + } + + @Nullable + @Override + protected ObjectIOException maybeMap(Throwable th) { + if (th instanceof StorageException) { + return asNonRetryableException(((StorageException) th).getCode(), th); + } + return null; + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java index 9b9c2d38602..cba7b93d083 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/gcs/GcsObjectIO.java @@ -30,6 +30,9 @@ import java.util.ArrayList; import java.util.List; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOException.IOMode; +import org.projectnessie.catalog.files.api.ObjectIOInputStream; +import org.projectnessie.catalog.files.api.ObjectIOOutputStream; import org.projectnessie.catalog.secrets.KeySecret; import org.projectnessie.storage.uri.StorageUri; @@ -58,7 +61,8 @@ public InputStream readObject(StorageUri uri) { BlobId.of(location.bucket(), location.path()), sourceOptions.toArray(new BlobSourceOption[0])); bucketOptions.readChunkSize().ifPresent(reader::setChunkSize); - return Channels.newInputStream(reader); + GcsExceptionMapper mapper = new GcsExceptionMapper(uri, IOMode.READ); + return new ObjectIOInputStream(Channels.newInputStream(reader), mapper); } @Override @@ -79,6 +83,7 @@ public OutputStream writeObject(StorageUri uri) { BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(location.bucket(), location.path())).build(); WriteChannel channel = client.writer(blobInfo, writeOptions.toArray(new BlobWriteOption[0])); bucketOptions.writeChunkSize().ifPresent(channel::setChunkSize); - return Channels.newOutputStream(channel); + GcsExceptionMapper mapper = new GcsExceptionMapper(uri, IOMode.READ); + return new ObjectIOOutputStream(Channels.newOutputStream(channel), mapper); } } diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java new file mode 100644 index 00000000000..8682d16ca22 --- /dev/null +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ExceptionMapper.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.catalog.files.s3; + +import java.time.Instant; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.projectnessie.catalog.files.api.ObjectIOException; +import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper; +import org.projectnessie.storage.uri.StorageUri; +import software.amazon.awssdk.core.exception.SdkServiceException; + +public class S3ExceptionMapper extends ObjectIOExceptionMapper { + private final Supplier retryTimeSupplier; + + protected S3ExceptionMapper( + StorageUri uri, ObjectIOException.IOMode ioMode, Supplier retryTimeSupplier) { + super(uri, ioMode); + this.retryTimeSupplier = retryTimeSupplier; + } + + @Nullable + @Override + protected ObjectIOException maybeMap(Throwable th) { + if (th instanceof SdkServiceException) { + SdkServiceException e = (SdkServiceException) th; + if (e.isThrottlingException()) { + return asThrottledException(retryTimeSupplier.get(), "S3 throttled", e.statusCode(), e); + } + + return asNonRetryableException(e.statusCode(), e); + } + + return null; + } +} diff --git a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java index 97db347e0ed..988bb459943 100644 --- a/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java +++ b/catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/S3ObjectIO.java @@ -25,14 +25,16 @@ import java.io.OutputStream; import java.time.Clock; import java.time.Duration; -import org.projectnessie.catalog.files.api.BackendThrottledException; -import org.projectnessie.catalog.files.api.NonRetryableException; import org.projectnessie.catalog.files.api.ObjectIO; +import org.projectnessie.catalog.files.api.ObjectIOException.IOMode; +import org.projectnessie.catalog.files.api.ObjectIOInputStream; import org.projectnessie.storage.uri.StorageUri; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkServiceException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; public class S3ObjectIO implements ObjectIO { @@ -53,22 +55,25 @@ public InputStream readObject(StorageUri uri) throws IOException { S3Client s3client = s3clientSupplier.getClient(uri); + S3ExceptionMapper mapper = + new S3ExceptionMapper( + uri, + IOMode.READ, + () -> + clock + .instant() + .plus( + s3clientSupplier.s3config().retryAfter().orElse(Duration.of(10, SECONDS)))); try { - return s3client.getObject( - GetObjectRequest.builder() - .bucket(uri.requiredAuthority()) - .key(withoutLeadingSlash(uri)) - .build()); + ResponseInputStream responseStream = + s3client.getObject( + GetObjectRequest.builder() + .bucket(uri.requiredAuthority()) + .key(withoutLeadingSlash(uri)) + .build()); + return new ObjectIOInputStream(responseStream, mapper); } catch (SdkServiceException e) { - if (e.isThrottlingException()) { - throw new BackendThrottledException( - clock - .instant() - .plus(s3clientSupplier.s3config().retryAfter().orElse(Duration.of(10, SECONDS))), - "S3 throttled", - e); - } - throw new NonRetryableException(e); + throw mapper.toStorageFailure(e); } } @@ -84,12 +89,28 @@ public void close() throws IOException { S3Client s3client = s3clientSupplier.getClient(uri); - s3client.putObject( - PutObjectRequest.builder() - .bucket(uri.requiredAuthority()) - .key(withoutLeadingSlash(uri)) - .build(), - RequestBody.fromBytes(toByteArray())); + S3ExceptionMapper mapper = + new S3ExceptionMapper( + uri, + IOMode.READ, + () -> + clock + .instant() + .plus( + s3clientSupplier + .s3config() + .retryAfter() + .orElse(Duration.of(10, SECONDS)))); + try { + s3client.putObject( + PutObjectRequest.builder() + .bucket(uri.requiredAuthority()) + .key(withoutLeadingSlash(uri)) + .build(), + RequestBody.fromBytes(toByteArray())); + } catch (SdkServiceException e) { + throw mapper.toStorageFailure(e); + } } }; } diff --git a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java index 8af527de0b9..00973fa8a0c 100644 --- a/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java +++ b/catalog/service/impl/src/main/java/org/projectnessie/catalog/service/impl/ImportSnapshotWorker.java @@ -107,16 +107,11 @@ private EntitySnapshotObj.Builder importIcebergTable( if (snapshot == null) { IcebergTableMetadata tableMetadata; StorageUri metadataLocation = StorageUri.of(content.getMetadataLocation()); - try { - InputStream input = taskRequest.objectIO().readObject(metadataLocation); - if (metadataLocation.requiredPath().endsWith(".gz")) { - input = new GZIPInputStream(input); - } - tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class); - } catch (IOException e) { - throw new IOException( - "Failed to read table metadata from " + content.getMetadataLocation(), e); + InputStream input = taskRequest.objectIO().readObject(metadataLocation); + if (metadataLocation.requiredPath().endsWith(".gz")) { + input = new GZIPInputStream(input); } + tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class); NessieTable table = entityObjForContent(content, tableMetadata, entityObjId); diff --git a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java index 51140966f13..866f8939013 100644 --- a/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java +++ b/catalog/service/rest/src/main/java/org/projectnessie/catalog/service/rest/IcebergErrorMapper.java @@ -30,6 +30,7 @@ import java.util.Locale; import java.util.concurrent.CompletionException; import java.util.stream.Collectors; +import org.projectnessie.catalog.files.api.ObjectIOException; import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse; import org.projectnessie.catalog.formats.iceberg.rest.IcebergException; import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException; @@ -42,6 +43,7 @@ import org.projectnessie.model.Conflict; import org.projectnessie.model.ContentKey; import org.projectnessie.services.config.ExceptionConfig; +import org.projectnessie.storage.uri.StorageUri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +80,8 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) { body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex); } else if (ex instanceof IcebergException) { body = ((IcebergException) ex).toErrorResponse(); + } else if (ex instanceof ObjectIOException) { + body = mapStorageFailure((ObjectIOException) ex); } if (body == null) { @@ -89,6 +93,33 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) { return Response.status(code == null ? 500 : code).entity(body).build(); } + private static String message(ObjectIOException e) { + return String.format( + "Unable to %s %s due to: %s", + (e.ioMode().name().toLowerCase(Locale.ROOT)), + e.uri().map(StorageUri::location).orElse("[unknown location]"), + (e.getCause() == null ? "[unknown cause]" : e.getCause())); + } + + private IcebergErrorResponse mapStorageFailure(ObjectIOException ex) { + // Log full stack trace on the server side for troubleshooting + LOGGER.info("Propagating storage failure to client: {}", ex, ex); + + int httpStatusCode = ex.httpStatusCode(); + switch (httpStatusCode) { + case 401: + return errorResponse(httpStatusCode, "NotAuthorizedException", message(ex), ex); + case 403: + return errorResponse(httpStatusCode, "ForbiddenException", message(ex), ex); + case 404: + // Convert storage-side "not found" into `IllegalArgumentException`. + // In most cases this results from bad locations in Iceberg metadata. + return errorResponse(400, "IllegalArgumentException", message(ex), ex); + default: + return null; + } + } + private IcebergErrorResponse mapNessieError( Exception ex, ErrorCode err, NessieErrorDetails errorDetails, IcebergEntityKind kind) { switch (err) { diff --git a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java index 6161e3bdc47..b9d0f878be1 100644 --- a/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java +++ b/servers/quarkus-server/src/intTest/java/org/projectnessie/server/catalog/s3/ITS3AssumeRoleIcebergCatalog.java @@ -27,6 +27,7 @@ import org.apache.iceberg.aws.AwsClientProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.AfterAll; @@ -97,6 +98,7 @@ void testCreateTableForbidden() { Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); // Attempts to create files blocked by the session IAM policy break the createTable() call assertThatThrownBy(() -> catalog.createTable(TableIdentifier.of(ns, "table1"), schema)) + .isInstanceOf(ForbiddenException.class) .hasMessageContaining("S3Exception: Access Denied") // make sure the error comes from the Catalog Server .hasStackTraceContaining("org.apache.iceberg.rest.RESTClient"); diff --git a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java index 78a5f7f8022..bd1efea0e9d 100644 --- a/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java +++ b/servers/quarkus-server/src/test/java/org/projectnessie/server/catalog/AbstractIcebergCatalogUnitTests.java @@ -17,6 +17,7 @@ import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.InstanceOfAssertFactories.STRING; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -35,17 +36,26 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.IntStream; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTSerializers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.projectnessie.client.api.NessieApiV2; +import org.projectnessie.model.CommitMeta; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; import org.projectnessie.objectstoragemock.HeapStorageBucket; +import org.projectnessie.server.catalog.ObjectStorageMockTestResourceLifecycleManager.AccessCheckHandlerHolder; public abstract class AbstractIcebergCatalogUnitTests extends AbstractIcebergCatalogTests { HeapStorageBucket heapStorageBucket; + AccessCheckHandlerHolder accessCheckHandler; @BeforeEach public void clearBucket() { @@ -205,4 +215,39 @@ public void namespacesPaging() throws Exception { .mapToObj(i -> TableIdentifier.of("namespace_0", "view_" + i)) .toList()); } + + @Test + void testReadFailure() throws Exception { + try (RESTCatalog catalog = catalog()) { + TableIdentifier id1 = TableIdentifier.of("ns", "table1"); + catalog.createNamespace(id1.namespace()); + Table table = catalog.buildTable(id1, SCHEMA).create(); + + try (NessieApiV2 api = nessieClientBuilder().build(NessieApiV2.class)) { + api.createNamespace().reference(api.getDefaultBranch()).namespace("test-ns").create(); + api.commitMultipleOperations() + .commitMeta(CommitMeta.fromMessage("test")) + .branch(api.getDefaultBranch()) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table2"), + IcebergTable.of(table.location() + "_test_access_denied_file", 1, 2, 3, 4))) + .operation( + Operation.Put.of( + ContentKey.of("ns", "table3"), + IcebergTable.of(table.location() + "_test_non_existent_file", 1, 2, 3, 4))) + .commit(); + } + + accessCheckHandler.set(key -> !key.contains("_test_access_denied_")); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table2"))) + .isInstanceOf(ForbiddenException.class) + .hasMessageContaining("_test_access_denied_file"); + + assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table3"))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("_test_non_existent_file"); + } + } } diff --git a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java index 14180e6029e..6d189cac0ed 100644 --- a/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java +++ b/servers/quarkus-server/src/testFixtures/java/org/projectnessie/server/catalog/ObjectStorageMockTestResourceLifecycleManager.java @@ -19,6 +19,7 @@ import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.projectnessie.objectstoragemock.AccessCheckHandler; import org.projectnessie.objectstoragemock.HeapStorageBucket; import org.projectnessie.objectstoragemock.ObjectStorageMock; import org.projectnessie.objectstoragemock.ObjectStorageMock.MockServer; @@ -45,6 +46,7 @@ public static String bucketWarehouseLocation(String scheme) { "ObjectStorageMockTestResourceLifecycleManager.initAddress"; private final AssumeRoleHandlerHolder assumeRoleHandler = new AssumeRoleHandlerHolder(); + private final AccessCheckHandlerHolder accessCheckHandler = new AccessCheckHandlerHolder(); private HeapStorageBucket heapStorageBucket; private MockServer server; @@ -58,6 +60,7 @@ public Map start() { .initAddress("localhost") .putBuckets(BUCKET, heapStorageBucket.bucket()) .assumeRoleHandler(assumeRoleHandler) + .accessCheckHandler(accessCheckHandler) .build() .start(); @@ -90,6 +93,9 @@ public void inject(TestInjector testInjector) { testInjector.injectIntoFields( assumeRoleHandler, new TestInjector.MatchesType(AssumeRoleHandlerHolder.class)); + + testInjector.injectIntoFields( + accessCheckHandler, new TestInjector.MatchesType(AccessCheckHandlerHolder.class)); } @Override @@ -135,4 +141,18 @@ public AssumeRoleResult assumeRole( serialNumber); } } + + public static final class AccessCheckHandlerHolder implements AccessCheckHandler { + private final AtomicReference handler = new AtomicReference<>(); + + public void set(AccessCheckHandler handler) { + this.handler.set(handler); + } + + @Override + public boolean accessAllowed(String objectKey) { + AccessCheckHandler delegate = handler.get(); + return delegate == null || delegate.accessAllowed(objectKey); + } + } } diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java new file mode 100644 index 00000000000..78f85527216 --- /dev/null +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AccessCheckHandler.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.objectstoragemock; + +public interface AccessCheckHandler { + boolean accessAllowed(String objectKey); +} diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java index b59f1e921b0..1592c3a5aab 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/AdlsGen2Resource.java @@ -84,6 +84,7 @@ public Response create( return withFilesystem( filesystem, + normalizedPath, b -> { Bucket.ObjectUpdater updater = b.updater().update(normalizedPath, Bucket.UpdaterMode.CREATE_NEW); @@ -116,6 +117,7 @@ public Response update( return withFilesystem( filesystem, + normalizedPath, b -> { if (!action.appendOrFlush()) { return notImplemented(); @@ -151,6 +153,7 @@ public Response read( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -194,6 +197,7 @@ public Response getProperties( return withFilesystem( filesystem, + normalizedPath, b -> { MockObject obj = b.object().retrieve(normalizedPath); if (obj == null) { @@ -225,6 +229,7 @@ public Response delete( return withFilesystem( filesystem, + normalizedPath, b -> { if (recursive) { try (Stream listStream = @@ -262,6 +267,7 @@ public Response list( return withFilesystem( filesystem, + normalizedPath, b -> { try (Stream listStream = b.lister().list(normalizedPath, continuationToken)) { @@ -349,6 +355,10 @@ private static Response keyNotFound() { Status.NOT_FOUND, "PathNotFound", "The specified path does not exist."); } + private static Response accessDenied() { + return dataLakeStorageError(Status.FORBIDDEN, "Forbidden", "Access Denied."); + } + private static Response dataLakeStorageError(Status status, String code, String message) { return Response.status(status) .header("x-ms-error-code", code) @@ -361,7 +371,12 @@ private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } - private Response withFilesystem(String filesystem, Function worker) { + private Response withFilesystem( + String filesystem, String path, Function worker) { + if (!mockServer.accessCheckHandler().accessAllowed(path)) { + return accessDenied(); + } + Bucket bucket = mockServer.buckets().get(filesystem); if (bucket == null) { return bucketNotFound(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java index 5ac68c901a9..c4400a4c92f 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/GcsResource.java @@ -380,6 +380,10 @@ private static Response keyNotFound() { return errorResponse(Status.NOT_FOUND, "The specified key does not exist."); } + private static Response accessDenied() { + return errorResponse(Status.FORBIDDEN, "Access Denied."); + } + private static Response errorResponse(Status status, String message) { return Response.status(status) .type(MediaType.APPLICATION_JSON) @@ -405,6 +409,10 @@ private Response withBucketObject( return withBucket( bucketName, bucket -> { + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + MockObject o = bucket.object().retrieve(objectName); if (o == null) { return keyNotFound(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java index b40491a53c6..36a8730c637 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/ObjectStorageMock.java @@ -91,6 +91,11 @@ public AssumeRoleHandler assumeRoleHandler() { .build()); } + @Value.Default + public AccessCheckHandler accessCheckHandler() { + return (key) -> true; + } + public interface MockServer extends AutoCloseable { URI getS3BaseUri(); diff --git a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java index ff0f3eeb4fe..96ba22b1e43 100644 --- a/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java +++ b/testing/object-storage-mock/src/main/java/org/projectnessie/objectstoragemock/S3Resource.java @@ -461,6 +461,13 @@ private static Response keyNotFound() { .build(); } + private static Response accessDenied() { + return Response.status(Status.FORBIDDEN) + .type(MediaType.APPLICATION_XML_TYPE) + .entity(ErrorResponse.of("AccessDenied", "Access Denied.")) + .build(); + } + private static Response notImplemented() { return Response.status(Status.NOT_IMPLEMENTED).build(); } @@ -478,6 +485,10 @@ private Response withBucketObject( return withBucket( bucketName, bucket -> { + if (!mockServer.accessCheckHandler().accessAllowed(objectName)) { + return accessDenied(); + } + MockObject o = bucket.object().retrieve(objectName); if (o == null) { return keyNotFound();