diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index 722f1f3fddd..ef8a0e72e8f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -25,9 +25,6 @@ import java.util.Map; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -42,18 +39,21 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.configuration.WorkUnitState.WorkingState; +import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.CopySource; import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; import org.apache.gobblin.data.management.copy.CopyableFile; -import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.PreserveAttributes; import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity; import org.apache.gobblin.data.management.copy.entities.PostPublishStep; @@ -358,8 +358,9 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus); if (copyEntity instanceof CopyableFile) { CopyableFile copyableFile = (CopyableFile) copyEntity; - preserveFileAttrInPublisher(copyableFile); if (wus.getWorkingState() == WorkingState.COMMITTED) { + // Committed files should exist in destination otherwise FNFE will be thrown + preserveFileAttrInPublisher(copyableFile); CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus); // Dataset Output path is injected in each copyableFile. // This can be optimized by having a dataset level equivalent class for copyable entities diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index e07083ff1d3..70a9d9a96eb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -175,7 +175,7 @@ public Void call() this.datasetState.setState(JobState.RunningState.COMMITTED); } } - } catch (Throwable throwable) { + } catch (Throwable throwable) { log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn, this.jobContext.getJobId()), throwable); throw new RuntimeException(throwable); @@ -192,7 +192,7 @@ public Void call() } } catch (IOException | RuntimeException ioe) { - log.error(String + log.error(String .format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()), ioe); throw new RuntimeException(ioe);