Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Excavator: Upgrades Baseline to the latest version #4

Merged
merged 1 commit into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private Optional<ShuffleUploadTaskMetrics> doWriteFilesAndClose(
SafeArg.of("attemptId", attemptId));
return Optional.empty();
}
} catch (Exception e) {
} catch (RuntimeException e) {
LOGGER.error("Exception encountered while checking for existence of shuffle.",
SafeArg.of("appId", appId),
SafeArg.of("shuffleId", shuffleId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void uploadEligibleBatches() {
}
}
LOGGER.trace("Finished checking for uploading shuffle files to the backing store.");
} catch (Exception e) {
} catch (RuntimeException e) {
LOGGER.warn("Failed to process outstanding uploads for shuffle files.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.palantir.shuffle.async.ShuffleDriverEndpointRef;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.shuffle.FetchFailedException;
import org.apache.spark.shuffle.api.ShuffleBlockInfo;
import org.apache.spark.shuffle.api.ShuffleBlockInputStream;
import org.apache.spark.storage.BlockId;
Expand Down Expand Up @@ -94,16 +93,8 @@ public ShuffleBlockInputStream next() {
BlockId resultBlock = resultStream.getBlockId();
ShuffleBlockId resolvedBlockId = convertBlockId(resultBlock);
remainingAttemptsByBlock.remove(resolvedBlockId);
} catch (Throwable e) {
if (e instanceof FetchFailedException) {
LOG.warn(
"Failed to fetch block the regular way, due to a fetch failed"
+ " exception. Fetching from the hadoop file system instead.",
e);
ShuffleBlockInfo blockInfo =
remainingAttemptsByBlock.get(((FetchFailedException) e).getShuffleBlockId());
driverEndpointRef.blacklistExecutor(blockInfo.getShuffleLocation().get());
}
} catch (RuntimeException | Error e) {

throw e;
}
} else if (!s3FetcherIterator.isInitialized()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,14 @@ public double runRead() throws ExecutionException, InterruptedException {
for (int j = 0; j < numPartitions; j++) {
int reduceId = j;
downloadFutures.add(downloadService.submit(() -> {
ListenableFuture<InputStream> future = client.getBlockData(
SHUFFLE_ID,
mapId,
reduceId,
ATTEMPT_ID);
ListenableFuture<InputStream> future = client.getBlockData(SHUFFLE_ID, mapId, reduceId, ATTEMPT_ID);
byte[] bytes = new byte[READ_BUFFER_SIZE];
try (InputStream stream = future.get()) {
while (stream.read(bytes) != -1) {
// do nothing
}
return true;
} catch (Exception e) {
} catch (ExecutionException | IOException | InterruptedException | RuntimeException e) {
throw new RuntimeException(e);
}
}));
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ buildscript {
classpath 'com.netflix.nebula:gradle-info-plugin:5.2.0'
classpath 'com.netflix.nebula:nebula-publishing-plugin:14.1.1'
classpath 'com.palantir.gradle.consistentversions:gradle-consistent-versions:1.12.4'
classpath 'com.palantir.baseline:gradle-baseline-java:2.35.2'
classpath 'com.palantir.baseline:gradle-baseline-java:2.40.1'
classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.2'
classpath 'gradle.plugin.org.inferred:gradle-processors:3.1.0'
classpath 'com.palantir.sls-packaging:gradle-sls-packaging:4.3.3'
Expand Down