Skip to content

Commit

Permalink
Merge pull request #1426 from HubSpot/prevent-duplicate-immediate-upl…
Browse files Browse the repository at this point in the history
…oaders

Prevent building duplicate immediate uploaders
  • Loading branch information
ssalinas authored Feb 24, 2017
2 parents ad679c0 + d97339f commit 2605887
Showing 1 changed file with 47 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class SingularityS3UploaderDriver extends WatchServiceHelper implements S
private final String hostname;
private final SingularityRunnerExceptionNotifier exceptionNotifier;

private final List<S3UploadMetadata> immediateUploadMetadata;
private final ReentrantLock lock;
private final ConcurrentMap<SingularityS3Uploader, Future<Integer>> immediateUploaders;

private ScheduledFuture<?> future;
Expand Down Expand Up @@ -108,6 +110,8 @@ public SingularityS3UploaderDriver(SingularityRunnerBaseConfiguration baseConfig
this.hostname = hostname;
this.exceptionNotifier = exceptionNotifier;

this.immediateUploadMetadata = new ArrayList<>();
this.lock = new ReentrantLock();
this.immediateUploaders = Maps.newConcurrentMap();
}

Expand Down Expand Up @@ -237,6 +241,7 @@ private int checkUploads() {
for (SingularityS3Uploader uploader : toRemove) {
metrics.getImmediateUploaderCounter().dec();
immediateUploaders.remove(uploader);
immediateUploadMetadata.remove(uploader.getUploadMetadata());

try {
LOG.debug("Deleting finished immediate uploader {}", uploader.getMetadataPath());
Expand Down Expand Up @@ -400,14 +405,18 @@ private boolean handleNewOrModifiedS3Metadata(Path filename) throws IOException
if (existingUploader != null) {
if (metadata.getUploadImmediately().isPresent() && metadata.getUploadImmediately().get()) {
LOG.debug("Existing metadata {} from {} changed to be immediate, forcing upload", metadata, filename);
metrics.getUploaderCounter().dec();
metrics.getImmediateUploaderCounter().inc();
if (canCreateImmediateUploader(metadata)) {
metrics.getUploaderCounter().dec();
metrics.getImmediateUploaderCounter().inc();

metadataToUploader.remove(existingUploader.getUploadMetadata());
uploaderLastHadFilesAt.remove(existingUploader);
expiring.remove(existingUploader);
performImmediateUpload(existingUploader);
return true;
metadataToUploader.remove(existingUploader.getUploadMetadata());
uploaderLastHadFilesAt.remove(existingUploader);
expiring.remove(existingUploader);
performImmediateUpload(existingUploader);
return true;
} else {
return false;
}
} else if (existingUploader.getUploadMetadata().isFinished() == metadata.isFinished()) {
LOG.debug("Ignoring metadata {} from {} because there was already one present", metadata, filename);
return false;
Expand Down Expand Up @@ -443,14 +452,19 @@ private boolean handleNewOrModifiedS3Metadata(Path filename) throws IOException

if (metadata.getUploadImmediately().isPresent()
&& metadata.getUploadImmediately().get()) {
metrics.getImmediateUploaderCounter().inc();
this.performImmediateUpload(uploader);
if (canCreateImmediateUploader(metadata)) {
metrics.getImmediateUploaderCounter().inc();
this.performImmediateUpload(uploader);
return true;
} else {
return false;
}
} else {
metrics.getUploaderCounter().inc();
metadataToUploader.put(metadata, uploader);
uploaderLastHadFilesAt.put(uploader, System.currentTimeMillis());
return true;
}
return true;
} catch (Throwable t) {
LOG.info("Ignoring metadata {} because uploader couldn't be created", metadata, t);
return false;
Expand Down Expand Up @@ -515,4 +529,27 @@ private boolean isS3MetadataFile(Path filename) {

return true;
}

private boolean canCreateImmediateUploader(S3UploadMetadata metadata) {
try {
if (lock.tryLock(400, TimeUnit.MILLISECONDS)) {
if (this.immediateUploadMetadata.contains(metadata)) {
LOG.debug("Already have an immediate uploader for metadata {}.", metadata);
return false;
} else {
LOG.debug("Preparing to create new immediate uploader for metadata {}.", metadata);
this.immediateUploadMetadata.add(metadata);
return true;
}
} else {
LOG.debug("Could not acquire lock to create an immediate uploader for metadata {}.", metadata);
return false;
}
} catch (InterruptedException exn) {
LOG.debug("Interrupted while waiting on a lock to create an immediate uploader for metadata {}.", metadata);
return false;
} finally {
lock.unlock();
}
}
}

0 comments on commit 2605887

Please sign in to comment.