Skip to content

Commit

Permalink
Hive: Avoid closing null writer in output committer abortTask (#2150)
Browse files Browse the repository at this point in the history
Co-authored-by: Marton Bod <mbod@cloudera.com>
  • Loading branch information
marton-bod and marton-bod authored Feb 3, 2021
1 parent e4185cd commit a42df9e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* An Iceberg table committer for adding data files to the Iceberg tables.
* Currently independent of the Hive ACID transactions.
*/
public final class HiveIcebergOutputCommitter extends OutputCommitter {
public class HiveIcebergOutputCommitter extends OutputCommitter {
private static final String FOR_COMMIT_EXTENSION = ".forCommit";

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
Expand Down Expand Up @@ -90,7 +90,7 @@ public void commitTask(TaskAttemptContext context) throws IOException {
TaskAttemptID attemptID = context.getTaskAttemptID();
String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(),
attemptID.getJobID(), attemptID.getTaskID().getId());
HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.getWriter(attemptID);

DataFile[] closedFiles;
if (writer != null) {
Expand All @@ -101,6 +101,9 @@ public void commitTask(TaskAttemptContext context) throws IOException {

// Creating the file containing the data files generated by this task
createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io(context.getJobConf()));

// remove the writer to release the object
HiveIcebergRecordWriter.removeWriter(attemptID);
}

/**
Expand All @@ -114,7 +117,9 @@ public void abortTask(TaskAttemptContext context) throws IOException {
HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());

// Remove files if it was not done already
writer.close(true);
if (writer != null) {
writer.close(true);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) {
return writers.remove(taskAttemptID);
}

static HiveIcebergRecordWriter getWriter(TaskAttemptID taskAttemptID) {
return writers.get(taskAttemptID);
}

HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
TaskAttemptID taskAttemptID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobStatus;
Expand All @@ -54,7 +55,10 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriter;
import static org.apache.iceberg.types.Types.NestedField.required;

public class TestHiveIcebergOutputCommitter {
Expand Down Expand Up @@ -182,6 +186,33 @@ public void testAbortJob() throws IOException {
HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
}

@Test
public void writerIsClosedAfterTaskCommitFailure() throws IOException {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
HiveIcebergOutputCommitter failingCommitter = Mockito.spy(committer);
ArgumentCaptor<TaskAttemptContextImpl> argumentCaptor = ArgumentCaptor.forClass(TaskAttemptContextImpl.class);
String exceptionMessage = "Commit task failed!";
Mockito.doThrow(new RuntimeException(exceptionMessage))
.when(failingCommitter).commitTask(argumentCaptor.capture());

Table table = table(temp.getRoot().getPath(), false);
JobConf conf = jobConf(table, 1);
try {
writeRecords(1, 0, true, false, conf, failingCommitter);
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains(exceptionMessage));
}

Assert.assertEquals(1, argumentCaptor.getAllValues().size());
TaskAttemptID capturedId = argumentCaptor.getValue().getTaskAttemptID();
// writer is still in the map after commitTask failure
Assert.assertNotNull(getWriter(capturedId));
failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
// abortTask succeeds and removes writer
Assert.assertNull(getWriter(capturedId));
}

private Table table(String location, boolean partitioned) {
HadoopTables tables = new HadoopTables();
return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location);
Expand Down Expand Up @@ -212,11 +243,12 @@ private JobConf jobConf(Table table, int taskNum) {
* @param abortTasks If <code>true</code> the tasks will be aborted - needed so we can simulate no commit/no abort
* situation
* @param conf The job configuration
* @param committer The output committer that should be used for committing/aborting the tasks
* @return The random generated records which were appended to the table
* @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
*/
private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
JobConf conf) throws IOException {
JobConf conf, OutputCommitter committer) throws IOException {
List<Record> expected = new ArrayList<>(RECORD_NUM * taskNum);

FileIO io = HiveIcebergStorageHandler.io(conf);
Expand All @@ -243,13 +275,18 @@ private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTas

testWriter.close(false);
if (commitTasks) {
new HiveIcebergOutputCommitter().commitTask(new TaskAttemptContextImpl(conf, taskId));
committer.commitTask(new TaskAttemptContextImpl(conf, taskId));
expected.addAll(records);
} else if (abortTasks) {
new HiveIcebergOutputCommitter().abortTask(new TaskAttemptContextImpl(conf, taskId));
committer.abortTask(new TaskAttemptContextImpl(conf, taskId));
}
}

return expected;
}

private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
JobConf conf) throws IOException {
return writeRecords(taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
}
}

0 comments on commit a42df9e

Please sign in to comment.