Skip to content

Commit

Permalink
[SPARK-44316][BUILD] Upgrade Jersey to 2.40
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The pr aims to upgrade Jersey from 2.36 to 2.40.

### Why are the changes needed?
1.This version adapts to ASM9.5, which is also used by Spark currently
[Adopt ASM 9.5](eclipse-ee4j/jersey#5305)

2.Also fix some bugs, eg:
[Fix possible NPE in netty client](eclipse-ee4j/jersey#5330)
[Get media type fix](eclipse-ee4j/jersey#5282)

3.Security vulnerability fix:
[CVE for dependency jackson-databind](eclipse-ee4j/jersey#5225)

4.Full Release Notes:
https://github.com/eclipse-ee4j/jersey/releases/tag/2.40
https://github.com/eclipse-ee4j/jersey/releases/tag/2.39
https://github.com/eclipse-ee4j/jersey/releases/tag/2.38
https://github.com/eclipse-ee4j/jersey/releases/tag/2.37

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #41874 from panbingkun/SPARK-44316.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
a0x8o committed Jul 6, 2023
1 parent 116479c commit 52142fe
Show file tree
Hide file tree
Showing 455 changed files with 22,050 additions and 3,161 deletions.
10 changes: 8 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,19 @@ exportMethods("glm",
"spark.lm",
"spark.fmRegressor")

# Job group lifecycle management methods
# Job group and job tag lifecycle management methods
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup",
"setJobDescription",
"setInterruptOnCancel",
"setLocalProperty",
"getLocalProperty")
"getLocalProperty",
"addJobTag",
"removeJobTag",
"getJobTags",
"clearJobTags",
"cancelJobsWithTag")

# Export Utility methods
export("setLogLevel")
Expand Down
98 changes: 98 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,104 @@ cancelJobGroup <- function(groupId) {
invisible(callJMethod(sc, "cancelJobGroup", groupId))
}

#' Set the behavior of job cancellation from jobs started in this thread.
#'
#' @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
#' being called on the job's executor threads. This is useful to help ensure that the tasks
#' are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
#' may respond to Thread.interrupt() by marking nodes as dead.
#' @rdname setInterruptOnCancel
#' @name setInterruptOnCancel
#' @examples
#'\dontrun{
#' sparkR.session()
#' setInterruptOnCancel(true)
#'}
#' @note cancelJobGroup since 3.5.0
setInterruptOnCancel <- function(interruptOnCancel) {
sc <- getSparkContext()
invisible(callJMethod(sc, "setInterruptOnCancel", interruptOnCancel))
}

#' Add a tag to be assigned to all the jobs started by this thread.
#'
#' @param tag The tag to be added. Cannot contain ',' (comma) character.
#' @rdname addJobTAg
#' @name addJobTag
#' @examples
#'\dontrun{
#' sparkR.session()
#' addJobTag("myJobTag")
#'}
#' @note addJobTag since 3.5.0
addJobTag <- function(tag) {
sc <- getSparkContext()
invisible(callJMethod(sc, "addJobTag", tag))
}

#' Remove a tag previously added to be assigned to all the jobs started by this thread.
#' Noop if such a tag was not added earlier.
#'
#' @param tag The tag to be removed. Cannot contain ',' (comma) character.
#' @rdname removeJobTAg
#' @name removeJobTag
#' @examples
#'\dontrun{
#' sparkR.session()
#' removeJobTag("myJobTag")
#'}
#' @note cancelJobGroup since 3.5.0
removeJobTag <- function(tag) {
sc <- getSparkContext()
invisible(callJMethod(sc, "removeJobTag", tag))
}

#' Get the tags that are currently set to be assigned to all the jobs started by this thread.
#'
#' @rdname getJobTags
#' @name getJobTags
#' @examples
#'\dontrun{
#' sparkR.session()
#' tags <- getJobTags()
#'}
#' @note getJobTags since 3.5.0
getJobTags <- function() {
sc <- getSparkContext()
callJStatic("org.apache.spark.api.r.RUtils", "getJobTags", sc)
}

#' Clear the current thread's job tags.
#'
#' @rdname clearJobTags
#' @name clearJobTags
#' @examples
#'\dontrun{
#' sparkR.session()
#' clearJobTags()
#'}
#' @note clearJobTags since 3.5.0
clearJobTags <- function() {
sc <- getSparkContext()
invisible(callJMethod(sc, "clearJobTags"))
}

#' Cancel active jobs that have the specified tag.
#'
#' @param tag The tag to be cancelled. Cannot contain ',' (comma) character.
#' @rdname cancelJobsWithTag
#' @name cancelJobsWithTag
#' @examples
#'\dontrun{
#' sparkR.session()
#' cancelJobsWithTag("myTag")
#'}
#' @note cancelJobGroup since 3.5.0
cancelJobsWithTag <- function(tag) {
sc <- getSparkContext()
invisible(callJMethod(sc, "cancelJobsWithTag", tag))
}

#' Set a human readable description of the current job.
#'
#' Set a description that is shown as a job description in UI.
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/pkgdown/_pkgdown_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,19 @@ reference:
- title: "Spark Session and Context"
- contents:
- cancelJobGroup
- cancelJobsWithTag
- clearCache
- clearJobGroup
- getLocalProperty
- install.spark
- setCheckpointDir
- setJobDescription
- setInterruptOnCancel
- setJobGroup
- addJobTag
- removeJobTag
- getJobTags
- clearJobTags
- setLocalProperty
- setLogLevel
- spark.addFile
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ test_that("job group functions can be called", {
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
clearJobGroup()
setInterruptOnCancel(TRUE)

sparkR.session.stop()
expect_true(TRUE)
})

test_that("job tag functions can be called", {
sc <- sparkR.sparkContext(master = sparkRTestMaster)
addJobTag("B")
clearJobTags()
expect_true(identical(getJobTags(), list()))
addJobTag("A")
expect_true(identical(getJobTags(), list("A")))
removeJobTag("A")
expect_true(identical(getJobTags(), list()))
cancelJobsWithTag("A")

sparkR.session.stop()
expect_true(TRUE)
Expand Down
2 changes: 1 addition & 1 deletion assembly/README
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command

If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
-Dhadoop.version=3.3.5
-Dhadoop.version=3.3.6
2 changes: 1 addition & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ install_app() {
if [ -f "${local_checksum}" ]; then
echo " ${local_tarball}" >> ${local_checksum} # two spaces + file are important!
# Assuming SHA512 here for now
echo "Veryfing checksum from ${local_checksum}" 1>&2
echo "Verifying checksum from ${local_checksum}" 1>&2
if ! shasum -a 512 -c "${local_checksum}" > /dev/null ; then
echo "Bad checksum from ${remote_checksum}"
exit 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,13 @@ TransportClient createClient(InetSocketAddress address)
logger.debug("Creating new connection to {}", address);

Bootstrap bootstrap = new Bootstrap();
int connCreateTimeout = conf.connectionCreationTimeoutMs();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.receiveBuf() > 0) {
Expand All @@ -276,10 +277,19 @@ public void initChannel(SocketChannel ch) {
// Connect to the remote server
long preConnect = System.nanoTime();
ChannelFuture cf = bootstrap.connect(address);
if (!cf.await(conf.connectionCreationTimeoutMs())) {

if (connCreateTimeout <= 0) {
cf.awaitUninterruptibly();
assert cf.isDone();
if (cf.isCancelled()) {
throw new IOException(String.format("Connecting to %s cancelled", address));
} else if (!cf.isSuccess()) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
} else if (!cf.await(connCreateTimeout)) {
throw new IOException(
String.format("Connecting to %s timed out (%s ms)",
address, conf.connectionCreationTimeoutMs()));
address, connCreateTimeout));
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ public int connectionTimeoutMs() {
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}

/** Connect creation timeout in milliseconds. Default 30 secs. */
public int connectionCreationTimeoutMs() {
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
}

/** Number of concurrent connections between two nodes for fetching data. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;

import org.apache.spark.network.TestUtils;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.NoOpRpcHandler;
Expand All @@ -45,6 +41,8 @@
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;

import static org.junit.Assert.*;

public class TransportClientFactorySuite {
private TransportConf conf;
private TransportContext context;
Expand Down Expand Up @@ -237,4 +235,31 @@ public void fastFailConnectionInTimeWindow() {
Assert.assertThrows("fail this connection directly", IOException.class,
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
}

@Test
public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException {
Map<String, String> configMap = new HashMap<>();
configMap.put("spark.shuffle.io.connectionTimeout", "-1");
configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
RpcHandler rpcHandler = new NoOpRpcHandler();
try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
TransportClientFactory factory = ctx.createClientFactory()){
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
assertTrue(c1.isActive());
long expiredTime = System.currentTimeMillis() + 5000;
while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
Thread.sleep(10);
}
assertTrue(c1.isActive());
// When connectionCreationTimeout is unlimited, the connection shall be able to
// fail when the server is not reachable.
TransportServer server = ctx.createServer();
int unreachablePort = server.getPort();
JavaUtils.closeQuietly(server);
IOException exception = Assert.assertThrows(IOException.class,
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
assertNotEquals(exception.getCause(), null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public MergedBlockMeta getMergedBlockMeta(
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
if (numChunks <= 0) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s is empty", indexFile.getPath()));
}
File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s not found",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,9 @@ public void testFailureAfterData() throws IOException {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
RuntimeException e = assertThrows(RuntimeException.class,
() -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}

Expand All @@ -304,8 +305,9 @@ public void testFailureAfterMultipleDataBlocks() throws IOException {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
RuntimeException e = assertThrows(RuntimeException.class,
() -> pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0));
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(9, 0, 0, 0, 0, 0, 9);
}

Expand Down
7 changes: 7 additions & 0 deletions common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-common-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,9 @@ public final class Platform {
}
try {
Class<?> cls = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> constructor;
try {
constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
} catch (NoSuchMethodException e) {
// DirectByteBuffer(long,int) was removed in
// https://github.com/openjdk/jdk/commit/a56598f5a534cc9223367e7faa8433ea38661db9
constructor = cls.getDeclaredConstructor(Long.TYPE, Long.TYPE);
}
Constructor<?> constructor = (majorVersion < 21) ?
cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE) :
cls.getDeclaredConstructor(Long.TYPE, Long.TYPE);
Field cleanerField = cls.getDeclaredField("cleaner");
try {
constructor.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ public static long roundNumberOfBytesToNearestWord(long numBytes) {
return numBytes + ((8 - remainder) & 0x7);
}

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat smaller.
// Be conservative and lower the cap a little.
// Refer to "http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/tip/src/share/classes/java/util/ArrayList.java#l229"
// This value is word rounded. Use this value if the allocated byte arrays are used to store other
// types rather than bytes.
public static final int MAX_ROUNDED_ARRAY_LENGTH = Integer.MAX_VALUE - 15;
public static final int MAX_ROUNDED_ARRAY_LENGTH = ByteArrayUtils.MAX_ROUNDED_ARRAY_LENGTH;

private static final boolean unaligned = Platform.unaligned();
/**
Expand Down
26 changes: 26 additions & 0 deletions common/utils/src/main/java/org/apache/spark/memory/MemoryMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory;

import org.apache.spark.annotation.Private;

@Private
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
Loading

0 comments on commit 52142fe

Please sign in to comment.