Skip to content

Commit

Permalink
[GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher (
Browse files Browse the repository at this point in the history
…#4066)

* Support partial commit semantic from copydatapublisher
  • Loading branch information
Will-Lo authored Oct 8, 2024
1 parent d4c9c35 commit 2afab69
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -42,18 +39,21 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
Expand Down Expand Up @@ -358,8 +358,9 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition,
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
CopyableFile copyableFile = (CopyableFile) copyEntity;
preserveFileAttrInPublisher(copyableFile);
if (wus.getWorkingState() == WorkingState.COMMITTED) {
// Committed files should exist in destination otherwise FNFE will be thrown
preserveFileAttrInPublisher(copyableFile);
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus);
// Dataset Output path is injected in each copyableFile.
// This can be optimized by having a dataset level equivalent class for copyable entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Void call()
this.datasetState.setState(JobState.RunningState.COMMITTED);
}
}
} catch (Throwable throwable) {
} catch (Throwable throwable) {
log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn,
this.jobContext.getJobId()), throwable);
throw new RuntimeException(throwable);
Expand All @@ -192,7 +192,7 @@ public Void call()
}

} catch (IOException | RuntimeException ioe) {
log.error(String
log.error(String
.format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()),
ioe);
throw new RuntimeException(ioe);
Expand Down

0 comments on commit 2afab69

Please sign in to comment.