Skip to content

Commit

Permalink
Fixing failing compaction/parallel index jobs during upgrade due to n…
Browse files Browse the repository at this point in the history
…ew actions being available on the overlord. (apache#15430)

* Fixing failing compaction/parallel index jobs during upgrade due to new actions not available on the overlord.

* Fixing build

* Removing extra space.

* Fixing json getter.

* Review comments.

(cherry picked from commit a018819)
  • Loading branch information
cryptoe committed Nov 29, 2023
1 parent f18978c commit d79f04d
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -371,7 +372,8 @@ Pair<SortedMap<Interval, String>, Map<Interval, List<DataSegment>>> checkSegment
// Pair<interval -> max(created_date), interval -> list<DataSegment>>
Pair<Map<Interval, String>, Map<Interval, List<DataSegment>>> baseSegmentsSnapshot =
getMaxCreateDateAndBaseSegments(
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(), Intervals.ETERNITY)
metadataStorageCoordinator.retrieveUsedSegmentsAndCreatedDates(spec.getBaseDataSource(),
Collections.singletonList(Intervals.ETERNITY))
);
// baseSegments are used to create HadoopIndexTask
Map<Interval, List<DataSegment>> baseSegments = baseSegmentsSnapshot.rhs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
Expand All @@ -38,6 +37,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -63,20 +63,18 @@ public class RetrieveSegmentsToReplaceAction implements TaskAction<Collection<Da
{
private static final Logger log = new Logger(RetrieveSegmentsToReplaceAction.class);

@JsonIgnore
private final String dataSource;

@JsonIgnore
private final Interval interval;
private final List<Interval> intervals;

@JsonCreator
public RetrieveSegmentsToReplaceAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
@JsonProperty("intervals") List<Interval> intervals
)
{
this.dataSource = dataSource;
this.interval = interval;
this.intervals = intervals;
}

@JsonProperty
Expand All @@ -86,9 +84,9 @@ public String getDataSource()
}

@JsonProperty
public Interval getInterval()
public List<Interval> getIntervals()
{
return interval;
return intervals;
}

@Override
Expand Down Expand Up @@ -128,7 +126,7 @@ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)

Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = new HashMap<>();
for (Pair<DataSegment, String> segmentAndCreatedDate :
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, interval)) {
toolbox.getIndexerMetadataStorageCoordinator().retrieveUsedSegmentsAndCreatedDates(dataSource, intervals)) {
final DataSegment segment = segmentAndCreatedDate.lhs;
final String created = segmentAndCreatedDate.rhs;
intervalToCreatedToSegments.computeIfAbsent(segment.getInterval(), s -> new HashMap<>())
Expand Down Expand Up @@ -165,7 +163,7 @@ public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
private Collection<DataSegment> retrieveAllVisibleSegments(TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUsedSegmentsForInterval(dataSource, interval, Segments.ONLY_VISIBLE);
.retrieveUsedSegmentsForIntervals(dataSource, intervals, Segments.ONLY_VISIBLE);
}

@Override
Expand All @@ -185,25 +183,20 @@ public boolean equals(Object o)
}

RetrieveSegmentsToReplaceAction that = (RetrieveSegmentsToReplaceAction) o;

if (!dataSource.equals(that.dataSource)) {
return false;
}
return interval.equals(that.interval);
return Objects.equals(dataSource, that.dataSource) && Objects.equals(intervals, that.intervals);
}

@Override
public int hashCode()
{
return Objects.hash(dataSource, interval);
return Objects.hash(dataSource, intervals);
}

@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
return "RetrieveSegmentsToReplaceAction{" +
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", intervals=" + intervals +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public enum BatchProcessingMode
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true;
private static final long DEFAULT_TMP_STORAGE_BYTES_PER_TASK = -1;
private static final boolean DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE = false;

@JsonProperty
private final String baseDir;
Expand Down Expand Up @@ -125,6 +126,9 @@ public enum BatchProcessingMode
@JsonProperty
private final long tmpStorageBytesPerTask;

@JsonProperty
private final boolean enableConcurrentAppendAndReplace;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
Expand All @@ -142,7 +146,8 @@ public TaskConfig(
@JsonProperty("batchProcessingMode") String batchProcessingMode,
@JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns,
@JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush,
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask
@JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask,
@JsonProperty("enableConcurrentAppendAndReplace") @Nullable Boolean enableConcurrentAppendAndReplace
)
{
this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir"));
Expand Down Expand Up @@ -193,6 +198,10 @@ public TaskConfig(

this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS);
this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK);
this.enableConcurrentAppendAndReplace = Configs.valueOrDefault(
enableConcurrentAppendAndReplace,
DEFAULT_ENABLE_CONCURRENT_APPEND_AND_REPLACE
);
}

private TaskConfig(
Expand All @@ -210,7 +219,8 @@ private TaskConfig(
BatchProcessingMode batchProcessingMode,
boolean storeEmptyColumns,
boolean encapsulatedTask,
long tmpStorageBytesPerTask
long tmpStorageBytesPerTask,
boolean enableConcurrentAppendAndReplace
)
{
this.baseDir = baseDir;
Expand All @@ -228,6 +238,7 @@ private TaskConfig(
this.storeEmptyColumns = storeEmptyColumns;
this.encapsulatedTask = encapsulatedTask;
this.tmpStorageBytesPerTask = tmpStorageBytesPerTask;
this.enableConcurrentAppendAndReplace = enableConcurrentAppendAndReplace;
}

@JsonProperty
Expand Down Expand Up @@ -344,6 +355,12 @@ public long getTmpStorageBytesPerTask()
return tmpStorageBytesPerTask;
}

@JsonProperty("enableConcurrentAppendAndReplace")
public boolean isConcurrentAppendAndReplaceEnabled()
{
return enableConcurrentAppendAndReplace;
}

private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand All @@ -370,7 +387,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir)
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}

Expand All @@ -391,7 +409,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask)
batchProcessingMode,
storeEmptyColumns,
encapsulatedTask,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -543,15 +543,15 @@ public static List<TimelineObjectHolder<String, DataSegment>> getTimelineForInte
Preconditions.checkNotNull(interval);

final Collection<DataSegment> usedSegments;
if (toolbox == null) {
if (toolbox == null || !toolbox.getConfig().isConcurrentAppendAndReplaceEnabled()) {
usedSegments = FutureUtils.getUnchecked(
coordinatorClient.fetchUsedSegments(dataSource, Collections.singletonList(interval)),
true
);
} else {
try {
usedSegments = toolbox.getTaskActionClient()
.submit(new RetrieveSegmentsToReplaceAction(dataSource, interval));
.submit(new RetrieveSegmentsToReplaceAction(dataSource, Collections.singletonList(interval)));
}
catch (IOException e) {
LOG.error(e, "Error retrieving the used segments for interval[%s].", interval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TaskConfigBuilder
private Boolean storeEmptyColumns;
private boolean enableTaskLevelLogPush;
private Long tmpStorageBytesPerTask;
private Boolean enableConcurrentAppendAndReplace;

public TaskConfigBuilder setBaseDir(String baseDir)
{
Expand Down Expand Up @@ -132,6 +133,18 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask)
return this;
}

public TaskConfigBuilder enableConcurrentAppendAndReplace()
{
this.enableConcurrentAppendAndReplace = true;
return this;
}

public TaskConfigBuilder disableConcurrentAppendAndReplace()
{
this.enableConcurrentAppendAndReplace = false;
return this;
}

public TaskConfig build()
{
return new TaskConfig(
Expand All @@ -149,7 +162,8 @@ public TaskConfig build()
batchProcessingMode,
storeEmptyColumns,
enableTaskLevelLogPush,
tmpStorageBytesPerTask
tmpStorageBytesPerTask,
enableConcurrentAppendAndReplace
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ private void verifyInputSegments(Task task, Interval interval, DataSegment... ex
Collection<DataSegment> allUsedSegments = taskActionClient.submit(
new RetrieveSegmentsToReplaceAction(
WIKI,
interval
Collections.singletonList(interval)
)
);
Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
.setDefaultRowFlushBoundary(50000)
.setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name())
.setTmpStorageBytesPerTask(-1L)
.enableConcurrentAppendAndReplace()
.build();

return new TaskToolboxFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public List<DataSegment> retrieveAllUsedSegments(String dataSource, Segments vis
}

@Override
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals)
{
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ default Collection<DataSegment> retrieveUsedSegmentsForInterval(
/**
*
* Retrieve all published segments which are marked as used and the created_date of these segments belonging to the
* given data source and interval from the metadata store.
* given data source and list of intervals from the metadata store.
*
* Unlike other similar methods in this interface, this method doesn't accept a {@link Segments} "visibility"
* parameter. The returned collection may include overshadowed segments and their created_dates, as if {@link
* Segments#INCLUDING_OVERSHADOWED} was passed. It's the responsibility of the caller to filter out overshadowed ones
* if needed.
*
* @param dataSource The data source to query
* @param interval The interval to query
* @param intervals The list of interval to query
*
* @return The DataSegments and the related created_date of segments
*/
Collection<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval);
Collection<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals);

/**
* Retrieve all published segments which may include any data in the given intervals and are marked as used from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,23 @@ private Collection<DataSegment> doRetrieveUsedSegments(
}

@Override
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, Interval interval)
public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(String dataSource, List<Interval> intervals)
{
StringBuilder queryBuilder = new StringBuilder(
"SELECT created_date, payload FROM %1$s WHERE dataSource = :dataSource AND used = true"
);

final List<Interval> intervals = new ArrayList<>();
// Do not need an interval condition if the interval is ETERNITY
if (!Intervals.isEternity(interval)) {
intervals.add(interval);
boolean hasEternityInterval = false;
for (Interval interval : intervals) {
if (Intervals.isEternity(interval)) {
hasEternityInterval = true;
break;
}
}

SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
queryBuilder,
intervals,
hasEternityInterval ? Collections.emptyList() : intervals,
SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS,
connector
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,28 +2658,28 @@ public void testRetrieveUsedSegmentsAndCreatedDates()
insertUsedSegments(ImmutableSet.of(defaultSegment));

List<Pair<DataSegment, String>> resultForIntervalOnTheLeft =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2001"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2001")));
Assert.assertTrue(resultForIntervalOnTheLeft.isEmpty());

List<Pair<DataSegment, String>> resultForIntervalOnTheRight =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("3000/3001"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("3000/3001")));
Assert.assertTrue(resultForIntervalOnTheRight.isEmpty());

List<Pair<DataSegment, String>> resultForExactInterval =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval()));
Assert.assertEquals(1, resultForExactInterval.size());
Assert.assertEquals(defaultSegment, resultForExactInterval.get(0).lhs);

List<Pair<DataSegment, String>> resultForIntervalWithLeftOverlap =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2000/2015-01-02"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2000/2015-01-02")));
Assert.assertEquals(resultForExactInterval, resultForIntervalWithLeftOverlap);

List<Pair<DataSegment, String>> resultForIntervalWithRightOverlap =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.of("2015-01-01/3000"));
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.of("2015-01-01/3000")));
Assert.assertEquals(resultForExactInterval, resultForIntervalWithRightOverlap);

List<Pair<DataSegment, String>> resultForEternity =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Intervals.ETERNITY);
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(Intervals.ETERNITY));
Assert.assertEquals(resultForExactInterval, resultForEternity);
}

Expand All @@ -2690,11 +2690,11 @@ public void testRetrieveUsedSegmentsAndCreatedDatesFetchesEternityForAnyInterval
insertUsedSegments(ImmutableSet.of(eternitySegment, firstHalfEternityRangeSegment, secondHalfEternityRangeSegment));

List<Pair<DataSegment, String>> resultForRandomInterval =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), defaultSegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(defaultSegment.getInterval()));
Assert.assertEquals(3, resultForRandomInterval.size());

List<Pair<DataSegment, String>> resultForEternity =
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), eternitySegment.getInterval());
coordinator.retrieveUsedSegmentsAndCreatedDates(defaultSegment.getDataSource(), Collections.singletonList(eternitySegment.getInterval()));
Assert.assertEquals(3, resultForEternity.size());
}

Expand Down

0 comments on commit d79f04d

Please sign in to comment.