Skip to content

Commit

Permalink
flush instead of synced-flush inactive shards
Browse files Browse the repository at this point in the history
Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Jan 18, 2022
1 parent e2f5c43 commit 6e7b868
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void testUpdateSnapshotStatus() throws Exception {

public void testSyncedFlushTransition() throws Exception {
Nodes nodes = buildNodeAndVersions();
assertTrue("bwc version is on 1.x or Legacy 7.x", nodes.getBWCVersion().before(Version.V_2_0_0));
assumeTrue("bwc version is on 1.x or Legacy 7.x", nodes.getBWCVersion().before(Version.V_2_0_0));
assumeFalse("no new node found", nodes.getNewNodes().isEmpty());
assumeFalse("no bwc node found", nodes.getBWCNodes().isEmpty());
// Allocate shards to new nodes then verify synced flush requests processed by old nodes/new nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public void testShrinkCommitsMergeOnIdle() throws Exception {
IndexService indexShards = service.indexService(target.getIndex());
IndexShard shard = indexShards.getShard(0);
assertTrue(shard.isActive());
shard.checkIdle(0);
shard.flushOnIdle(0);
assertFalse(shard.isActive());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand Down Expand Up @@ -179,24 +178,6 @@ public void testLockTryingToDelete() throws Exception {
}
}

public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
assertAcked(
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0))
);
client().prepareIndex("test", "test").setSource("{}", XContentType.JSON).get();
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
assertBusy(() -> {
IndexStats indexStats = client().admin().indices().prepareStats("test").clear().setTranslog(true).get().getIndex("test");
assertThat(indexStats.getTotal().translog.getUncommittedOperations(), equalTo(0));
indicesService.indexService(resolveIndex("test")).getShardOrNull(0).checkIdle(0);
});
}

public void testDurableFlagHasEffect() throws Exception {
createIndex("test");
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,6 @@ public void afterIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSha
}
}

@Override
public void onShardInactive(IndexShard indexShard) {
for (IndexEventListener listener : listeners) {
try {
listener.onShardInactive(indexShard);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage("[{}] failed to invoke on shard inactive callback", indexShard.shardId().getId()),
e
);
throw e;
}
}
}

@Override
public void indexShardStateChanged(
IndexShard indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,6 @@ default void indexShardStateChanged(
@Nullable String reason
) {}

/**
* Called when a shard is marked as inactive
*
* @param indexShard The shard that was marked inactive
*/
default void onShardInactive(IndexShard indexShard) {}

/**
* Called before the index gets created. Note that this is also called
* when the index is created on data nodes
Expand Down
27 changes: 18 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1960,7 +1960,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
onNewEngine(newEngine);
currentEngineReference.set(newEngine);
// We set active because we are now writing operations to the engine; this way,
// if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
// we can flush if we go idle after some time and become inactive.
active.set(true);
}
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
Expand Down Expand Up @@ -2156,19 +2156,28 @@ public void addShardFailureCallback(Consumer<ShardFailure> onShardFailure) {

/**
* Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
* indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
* indexing operation, so we can flush the index.
*/
public void checkIdle(long inactiveTimeNS) {
public void flushOnIdle(long inactiveTimeNS) {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
boolean wasActive = active.getAndSet(false);
if (wasActive) {
logger.debug("shard is now inactive");
try {
indexEventListener.onShardInactive(this);
} catch (Exception e) {
logger.warn("failed to notify index event listener", e);
}
logger.debug("flushing shard on inactive");
threadPool.executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("failed to flush shard on inactive", e);
}
}

@Override
protected void doRun() {
flush(new FlushRequest().waitIfOngoing(false).force(false));
periodicFlushMetric.inc();
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void runUnlocked() {
long totalBytesWriting = 0;
for (IndexShard shard : availableShards()) {

// Give shard a chance to transition to inactive so sync'd flush can happen:
// Give shard a chance to transition to inactive so we can flush:
checkIdle(shard, inactiveTime.nanos());

// How many bytes this shard is currently (async'd) moving from heap to disk:
Expand Down Expand Up @@ -443,7 +443,7 @@ private void runUnlocked() {
*/
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
shard.flushOnIdle(inactiveTimeNS);
} catch (AlreadyClosedException e) {
logger.trace(() -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.Segment;
import org.opensearch.index.engine.SegmentsStats;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -3885,7 +3884,7 @@ public void testScheduledRefresh() throws Exception {
indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}");
assertFalse(primary.scheduledRefresh());
assertTrue(primary.isSearchIdle());
primary.checkIdle(0);
primary.flushOnIdle(0);
assertTrue(primary.scheduledRefresh()); // make sure we refresh once the shard is inactive
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(3, searcher.getIndexReader().numDocs());
Expand Down Expand Up @@ -4099,92 +4098,6 @@ public void testSegmentMemoryTrackedWithRandomSearchers() throws Exception {
assertThat(breaker.getUsed(), equalTo(0L));
}

public void testFlushOnInactive() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
IndexMetadata metadata = IndexMetadata.builder("test")
.putMapping("_doc", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
.settings(settings)
.primaryTerm(0, 1)
.build();
ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(metadata.getIndex(), 0),
"n1",
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final ShardId shardId = shardRouting.shardId();
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);
AtomicBoolean markedInactive = new AtomicBoolean();
AtomicReference<IndexShard> primaryRef = new AtomicReference<>();
IndexShard primary = newShard(
shardRouting,
shardPath,
metadata,
null,
null,
new InternalEngineFactory(),
new EngineConfigFactory(new IndexSettings(metadata, settings)),
() -> {},
RetentionLeaseSyncer.EMPTY,
new IndexEventListener() {
@Override
public void onShardInactive(IndexShard indexShard) {
markedInactive.set(true);
primaryRef.get().flush(new FlushRequest());
}
}
);
primaryRef.set(primary);
recoverShardFromStore(primary);
for (int i = 0; i < 3; i++) {
indexDoc(primary, "_doc", "" + i, "{\"foo\" : \"" + randomAlphaOfLength(10) + "\"}");
primary.refresh("test"); // produce segments
}
List<Segment> segments = primary.segments(false);
Set<String> names = new HashSet<>();
for (Segment segment : segments) {
assertFalse(segment.committed);
assertTrue(segment.search);
names.add(segment.getName());
}
assertEquals(3, segments.size());
primary.flush(new FlushRequest());
primary.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(false));
primary.refresh("test");
segments = primary.segments(false);
for (Segment segment : segments) {
if (names.contains(segment.getName())) {
assertTrue(segment.committed);
assertFalse(segment.search);
} else {
assertFalse(segment.committed);
assertTrue(segment.search);
}
}
assertEquals(4, segments.size());

assertFalse(markedInactive.get());
assertBusy(() -> {
primary.checkIdle(0);
assertFalse(primary.isActive());
});

assertTrue(markedInactive.get());
segments = primary.segments(false);
assertEquals(1, segments.size());
for (Segment segment : segments) {
assertTrue(segment.committed);
assertTrue(segment.search);
}
closeShards(primary);
}

public void testOnCloseStats() throws IOException {
final IndexShard indexShard = newStartedShard(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,6 @@ public void indexShardStateChanged(
delegate.indexShardStateChanged(indexShard, previousState, currentState, reason);
}

@Override
public void onShardInactive(IndexShard indexShard) {
delegate.onShardInactive(indexShard);
}

@Override
public void beforeIndexCreated(Index index, Settings indexSettings) {
delegate.beforeIndexCreated(index, indexSettings);
Expand Down

0 comments on commit 6e7b868

Please sign in to comment.