From f244ff9dc0a4230a7fb30bcda8611b87978593b3 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Thu, 23 May 2024 18:18:16 -0400 Subject: [PATCH] chore: don't reimplement Channels.newOutputStream (#4363) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I9eafa3fa692db20f77b8a526761a8f76f597df50 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/java-bigtable-hbase/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # ☕️ If you write sample code, please follow the [samples format]( https://togithub.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md). --- .../beam/sequencefiles/SequenceFileSink.java | 56 +------------------ 1 file changed, 2 insertions(+), 54 deletions(-) diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java index 755ad580e6..f7a429b9c5 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java @@ -16,18 +16,13 @@ package com.google.cloud.bigtable.beam.sequencefiles; import com.google.common.collect.Sets; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.Buffer; -import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.io.Compression; import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; -import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; -import org.apache.beam.sdk.io.FileBasedSink.Writer; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.util.MimeTypes; @@ -155,7 +150,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { configuration.setStrings("io.serializations", writeOperation.serializationNames); FSDataOutputStream outputStream = - new FSDataOutputStream(new OutputStreamWrapper(channel), new Statistics("dataflow")); + new FSDataOutputStream(Channels.newOutputStream(channel), new Statistics("dataflow")); sequenceFile = SequenceFile.createWriter( configuration, @@ -181,51 +176,4 @@ public void write(KV value) throws Exception { sequenceFile.append(value.getKey(), value.getValue()); } } - - /** - * Adapter to allow Hadoop's {@link SequenceFile} to write to Beam's {@link WritableByteChannel}. - */ - static class OutputStreamWrapper extends OutputStream { - private final WritableByteChannel inner; - private final ByteBuffer singleByteBuffer = ByteBuffer.allocate(1); - - /** - * Constructs a new {@link OutputStreamWrapper}. - * - * @param inner An instance of Beam's {@link WritableByteChannel}. - */ - OutputStreamWrapper(WritableByteChannel inner) { - this.inner = inner; - } - - /** {@inheritDoc} */ - @Override - public void write(byte[] b, int off, int len) throws IOException { - int written = 0; - - ByteBuffer byteBuffer = ByteBuffer.wrap(b, off, len); - - while (written < len) { - // Workaround Java 9 overridden methods with covariant return types - ((Buffer) byteBuffer).position(written + off); - written += this.inner.write(byteBuffer); - } - } - - /** {@inheritDoc} */ - @Override - public void write(int b) throws IOException { - // Workaround Java 9 overridden methods with covariant return types - ((Buffer) singleByteBuffer).clear(); - singleByteBuffer.put((byte) b); - - int written = 0; - - while (written == 0) { - // Workaround Java 9 overridden methods with covariant return types - ((Buffer) singleByteBuffer).position(0); - written = this.inner.write(singleByteBuffer); - } - } - } }