From daf3d08f5cfc995beb0917cf4519a3ce20601af3 Mon Sep 17 00:00:00 2001 From: svc-excavator-bot Date: Fri, 6 Dec 2019 03:25:34 +0000 Subject: [PATCH] Excavator: Upgrades Baseline to the latest version --- .../async/s3/client/basic/HadoopShuffleClient.java | 2 +- .../merging/MergingShuffleUploadCoordinator.java | 2 +- .../s3/reader/FallbackToS3ShuffleIterator.java | 13 ++----------- .../async/s3/client/AsyncShuffleBenchmark.java | 8 ++------ build.gradle | 2 +- 5 files changed, 7 insertions(+), 20 deletions(-) diff --git a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/basic/HadoopShuffleClient.java b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/basic/HadoopShuffleClient.java index c62bb785..d209450f 100644 --- a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/basic/HadoopShuffleClient.java +++ b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/basic/HadoopShuffleClient.java @@ -185,7 +185,7 @@ private Optional doWriteFilesAndClose( SafeArg.of("attemptId", attemptId)); return Optional.empty(); } - } catch (Exception e) { + } catch (RuntimeException e) { LOGGER.error("Exception encountered while checking for existence of shuffle.", SafeArg.of("appId", appId), SafeArg.of("shuffleId", shuffleId), diff --git a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/merging/MergingShuffleUploadCoordinator.java b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/merging/MergingShuffleUploadCoordinator.java index 0ec13b5c..9166f1e6 100644 --- a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/merging/MergingShuffleUploadCoordinator.java +++ b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/client/merging/MergingShuffleUploadCoordinator.java @@ -132,7 +132,7 @@ private void uploadEligibleBatches() { } } LOGGER.trace("Finished checking for uploading shuffle files to the backing store."); - } catch (Exception e) { + } catch (RuntimeException e) { LOGGER.warn("Failed to process outstanding uploads for shuffle files.", e); } } diff --git a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/reader/FallbackToS3ShuffleIterator.java b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/reader/FallbackToS3ShuffleIterator.java index 19628853..e0eb2ff2 100644 --- a/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/reader/FallbackToS3ShuffleIterator.java +++ b/async-shuffle-upload-core/src/main/java/com/palantir/spark/shuffle/async/s3/reader/FallbackToS3ShuffleIterator.java @@ -29,7 +29,6 @@ import org.apache.spark.io.CompressionCodec; import org.apache.spark.palantir.shuffle.async.ShuffleDriverEndpointRef; import org.apache.spark.serializer.SerializerManager; -import org.apache.spark.shuffle.FetchFailedException; import org.apache.spark.shuffle.api.ShuffleBlockInfo; import org.apache.spark.shuffle.api.ShuffleBlockInputStream; import org.apache.spark.storage.BlockId; @@ -94,16 +93,8 @@ public ShuffleBlockInputStream next() { BlockId resultBlock = resultStream.getBlockId(); ShuffleBlockId resolvedBlockId = convertBlockId(resultBlock); remainingAttemptsByBlock.remove(resolvedBlockId); - } catch (Throwable e) { - if (e instanceof FetchFailedException) { - LOG.warn( - "Failed to fetch block the regular way, due to a fetch failed" - + " exception. Fetching from the hadoop file system instead.", - e); - ShuffleBlockInfo blockInfo = - remainingAttemptsByBlock.get(((FetchFailedException) e).getShuffleBlockId()); - driverEndpointRef.blacklistExecutor(blockInfo.getShuffleLocation().get()); - } + } catch (RuntimeException | Error e) { + throw e; } } else if (!s3FetcherIterator.isInitialized()) { diff --git a/async-shuffle-upload-data-generator/src/main/java/com/palantir/spark/shuffle/async/s3/client/AsyncShuffleBenchmark.java b/async-shuffle-upload-data-generator/src/main/java/com/palantir/spark/shuffle/async/s3/client/AsyncShuffleBenchmark.java index 468f4b81..f4494265 100644 --- a/async-shuffle-upload-data-generator/src/main/java/com/palantir/spark/shuffle/async/s3/client/AsyncShuffleBenchmark.java +++ b/async-shuffle-upload-data-generator/src/main/java/com/palantir/spark/shuffle/async/s3/client/AsyncShuffleBenchmark.java @@ -127,18 +127,14 @@ public double runRead() throws ExecutionException, InterruptedException { for (int j = 0; j < numPartitions; j++) { int reduceId = j; downloadFutures.add(downloadService.submit(() -> { - ListenableFuture future = client.getBlockData( - SHUFFLE_ID, - mapId, - reduceId, - ATTEMPT_ID); + ListenableFuture future = client.getBlockData(SHUFFLE_ID, mapId, reduceId, ATTEMPT_ID); byte[] bytes = new byte[READ_BUFFER_SIZE]; try (InputStream stream = future.get()) { while (stream.read(bytes) != -1) { // do nothing } return true; - } catch (Exception e) { + } catch (ExecutionException | IOException | InterruptedException | RuntimeException e) { throw new RuntimeException(e); } })); diff --git a/build.gradle b/build.gradle index 57397c9a..4a6cb188 100644 --- a/build.gradle +++ b/build.gradle @@ -24,7 +24,7 @@ buildscript { classpath 'com.netflix.nebula:gradle-info-plugin:5.2.0' classpath 'com.netflix.nebula:nebula-publishing-plugin:14.1.1' classpath 'com.palantir.gradle.consistentversions:gradle-consistent-versions:1.12.4' - classpath 'com.palantir.baseline:gradle-baseline-java:2.35.2' + classpath 'com.palantir.baseline:gradle-baseline-java:2.40.1' classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.2' classpath 'gradle.plugin.org.inferred:gradle-processors:3.1.0' classpath 'com.palantir.sls-packaging:gradle-sls-packaging:4.3.3'