Skip to content

Commit

Permalink
Add exception mappers to convert storage failures to Iceberg REST cli…
Browse files Browse the repository at this point in the history
…ent exceptions

* Note: before this change a storage 403 error would manifest as 500 on the REST client side.

* Add AccessCheckHandler to ObjectStorageMock for simulating access failures in tests.
  • Loading branch information
dimas-b committed May 22, 2024
1 parent 2513d37 commit 3c14611
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.files.api;

import com.google.common.collect.ImmutableList;
import java.util.Collection;
import java.util.OptionalInt;
import java.util.ServiceLoader;
import org.projectnessie.storage.uri.StorageUri;

public interface ObjectIOExceptionMapper {

OptionalInt toHttpStatusCode(Throwable th);

static StorageFailureException toStorageFailure(StorageUri uri, boolean read, Throwable th) {
if (th instanceof StorageFailureException) {
return (StorageFailureException) th;
}

for (ObjectIOExceptionMapper mapper : Mappers.values) {
OptionalInt status = mapper.toHttpStatusCode(th);
if (status.isPresent()) {
return new StorageFailureException(uri, read, status.getAsInt(), th);
}
}

return new StorageFailureException(uri, read, 500, th);
}

class Mappers {
private static final Collection<ObjectIOExceptionMapper> values;

static {
ImmutableList.Builder<ObjectIOExceptionMapper> list = ImmutableList.builder();
ServiceLoader.load(ObjectIOExceptionMapper.class).forEach(list::add);
values = list.build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.files.api;

import org.projectnessie.storage.uri.StorageUri;

public class StorageFailureException extends NonRetryableException {
private final StorageUri uri;
private final boolean read;
private final int httpStatusCode;

public StorageFailureException(
StorageUri uri, boolean read, int httpStatusCode, Throwable cause) {
super(cause);
this.uri = uri;
this.read = read;
this.httpStatusCode = httpStatusCode;
}

public StorageUri getUri() {
return uri;
}

public boolean isRead() {
return read;
}

public int getHttpStatusCode() {
return httpStatusCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
*/
package org.projectnessie.catalog.files.adls;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.DataLakeFileOutputStreamOptions;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.files.api.StorageFailureException;
import org.projectnessie.storage.uri.StorageUri;

public class AdlsObjectIO implements ObjectIO {
Expand All @@ -34,11 +37,15 @@ public AdlsObjectIO(AdlsClientSupplier clientSupplier) {
}

@Override
public InputStream readObject(StorageUri uri) {
public InputStream readObject(StorageUri uri) throws IOException {
DataLakeFileClient file = clientSupplier.fileClientForLocation(uri);
DataLakeFileInputStreamOptions options = new DataLakeFileInputStreamOptions();
clientSupplier.adlsOptions().readBlockSize().ifPresent(options::setBlockSize);
return file.openInputStream(options).getInputStream();
try {
return file.openInputStream(options).getInputStream();
} catch (BlobStorageException e) {
throw new StorageFailureException(uri, true, e.getStatusCode(), e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2024 Dremio
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.projectnessie.catalog.files.gcs;

import com.google.cloud.storage.StorageException;
import java.util.OptionalInt;
import org.projectnessie.catalog.files.api.ObjectIOExceptionMapper;

/** Extracts storage-side HTTP status code from GCS client exceptions. */
public class GcsExceptionMapper implements ObjectIOExceptionMapper {
// Note: unlike S3 and ADLS, GCS clients can throw access failures from I/O stream methods,
// which is why GCS exceptions have to be mapped by a generic exception mapper as opposed to
// a simple `catch` block in `GcsObjectIO`.

@Override
public OptionalInt toHttpStatusCode(Throwable th) {
while (th != null) {
if (th instanceof StorageException) {
return OptionalInt.of((((StorageException) th).getCode()));
}

th = th.getCause();
}

return OptionalInt.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.time.Clock;
import java.time.Duration;
import org.projectnessie.catalog.files.api.BackendThrottledException;
import org.projectnessie.catalog.files.api.NonRetryableException;
import org.projectnessie.catalog.files.api.ObjectIO;
import org.projectnessie.catalog.files.api.StorageFailureException;
import org.projectnessie.storage.uri.StorageUri;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
Expand Down Expand Up @@ -66,7 +66,8 @@ public InputStream readObject(StorageUri uri) throws IOException {
"S3 throttled",
e);
}
throw new NonRetryableException(e);

throw new StorageFailureException(uri, false, e.statusCode(), e);
}
}

Expand All @@ -82,12 +83,16 @@ public void close() throws IOException {

S3Client s3client = s3clientSupplier.getClient(uri);

s3client.putObject(
PutObjectRequest.builder()
.bucket(uri.requiredAuthority())
.key(uri.requiredPath())
.build(),
RequestBody.fromBytes(toByteArray()));
try {
s3client.putObject(
PutObjectRequest.builder()
.bucket(uri.requiredAuthority())
.key(uri.requiredPath())
.build(),
RequestBody.fromBytes(toByteArray()));
} catch (SdkServiceException e) {
throw new StorageFailureException(uri, false, e.statusCode(), e);
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (C) 2024 Dremio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.projectnessie.catalog.files.gcs.GcsExceptionMapper
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.projectnessie.catalog.service.impl;

import static java.util.Objects.requireNonNull;
import static org.projectnessie.catalog.files.api.ObjectIOExceptionMapper.toStorageFailure;
import static org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg.icebergTableSnapshotToNessie;
import static org.projectnessie.catalog.formats.iceberg.nessie.NessieModelIceberg.icebergViewSnapshotToNessie;
import static org.projectnessie.catalog.service.impl.Util.nessieIdToObjId;
Expand Down Expand Up @@ -112,8 +113,7 @@ private EntitySnapshotObj.Builder importIcebergTable(
}
tableMetadata = IcebergJson.objectMapper().readValue(input, IcebergTableMetadata.class);
} catch (IOException e) {
throw new IOException(
"Failed to read table metadata from " + content.getMetadataLocation(), e);
throw toStorageFailure(metadataLocation, true, e);
}

NessieTable table = entityObjForContent(content, tableMetadata, entityObjId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Locale;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.projectnessie.catalog.files.api.StorageFailureException;
import org.projectnessie.catalog.formats.iceberg.rest.IcebergErrorResponse;
import org.projectnessie.catalog.service.api.CatalogEntityAlreadyExistsException;
import org.projectnessie.error.BaseNessieClientServerException;
Expand Down Expand Up @@ -75,6 +76,8 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) {
body = mapNessieError(e, e.getErrorCode(), e.getErrorDetails(), kind);
} else if (ex instanceof IllegalArgumentException) {
body = errorResponse(400, "IllegalArgumentException", ex.getMessage(), ex);
} else if (ex instanceof StorageFailureException) {
body = mapStorageFailure((StorageFailureException) ex);
}

if (body == null) {
Expand All @@ -86,6 +89,33 @@ public Response toResponse(Throwable ex, IcebergEntityKind kind) {
return Response.status(code == null ? 500 : code).entity(body).build();
}

private static String message(StorageFailureException e) {
return String.format(
"Unable to %s %s due to: %s",
(e.isRead() ? "read" : "write"),
e.getUri(),
(e.getCause() == null ? "unknown cause" : e.getCause()));
}

private IcebergErrorResponse mapStorageFailure(StorageFailureException ex) {
// Log full stack trace on the server side for troubleshooting
LOGGER.info("Propagating storage failure to client: {}", ex, ex);

int httpStatusCode = ex.getHttpStatusCode();
switch (httpStatusCode) {
case 401:
return errorResponse(httpStatusCode, "NotAuthorizedException", message(ex), ex);
case 403:
return errorResponse(httpStatusCode, "ForbiddenException", message(ex), ex);
case 404:
// Convert storage-side "not found" into `IllegalArgumentException`.
// In most cases this results from bad locations in Iceberg metadata.
return errorResponse(400, "IllegalArgumentException", message(ex), ex);
default:
return null;
}
}

private IcebergErrorResponse mapNessieError(
Exception ex, ErrorCode err, NessieErrorDetails errorDetails, IcebergEntityKind kind) {
switch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.aws.AwsClientProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -99,6 +100,7 @@ void testCreateTableForbidden() {
Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
// Attempts to create files blocked by the session IAM policy break the createTable() call
assertThatThrownBy(() -> catalog.createTable(TableIdentifier.of(ns, "table1"), schema))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining("S3Exception: Access Denied")
// make sure the error comes from the Catalog Server
.hasStackTraceContaining("org.apache.iceberg.rest.RESTClient");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,64 @@
*/
package org.projectnessie.server.catalog;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.iceberg.rest.RESTCatalog;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.projectnessie.client.api.NessieApiV2;
import org.projectnessie.model.CommitMeta;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.objectstoragemock.HeapStorageBucket;
import org.projectnessie.server.catalog.ObjectStorageMockTestResourceLifecycleManager.AccessCheckHandlerHolder;

public abstract class AbstractIcebergCatalogUnitTests extends AbstractIcebergCatalogTests {

HeapStorageBucket heapStorageBucket;
AccessCheckHandlerHolder accessCheckHandler;

@BeforeEach
public void clearBucket() {
heapStorageBucket.clear();
}

@Test
void testReadFailure() throws Exception {
try (RESTCatalog catalog = catalog()) {
TableIdentifier id1 = TableIdentifier.of("ns", "table1");
catalog.createNamespace(id1.namespace());
Table table = catalog.buildTable(id1, SCHEMA).create();

try (NessieApiV2 api = nessieClientBuilder().build(NessieApiV2.class)) {
api.createNamespace().reference(api.getDefaultBranch()).namespace("test-ns").create();
api.commitMultipleOperations()
.commitMeta(CommitMeta.fromMessage("test"))
.branch(api.getDefaultBranch())
.operation(
Operation.Put.of(
ContentKey.of("ns", "table2"),
IcebergTable.of(table.location() + "_test_access_denied_file", 1, 2, 3, 4)))
.operation(
Operation.Put.of(
ContentKey.of("ns", "table3"),
IcebergTable.of(table.location() + "_test_non_existent_file", 1, 2, 3, 4)))
.commit();
}

accessCheckHandler.set(key -> !key.contains("_test_access_denied_"));

assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table2")))
.isInstanceOf(ForbiddenException.class)
.hasMessageContaining("_test_access_denied_file");

assertThatThrownBy(() -> catalog.loadTable(TableIdentifier.of("ns", "table3")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("_test_non_existent_file");
}
}
}
Loading

0 comments on commit 3c14611

Please sign in to comment.