Skip to content

Commit

Permalink
more trace
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Jul 26, 2018
1 parent 327f1ba commit 78c1b39
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.index;

import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.TieredMergePolicy;
Expand All @@ -42,12 +44,26 @@ final class EsTieredMergePolicy extends FilterMergePolicy {
regularMergePolicy = (TieredMergePolicy) in;
forcedMergePolicy = new TieredMergePolicy();
forcedMergePolicy.setMaxMergedSegmentMB(Double.POSITIVE_INFINITY); // unlimited
regularMergePolicy.setDeletesPctAllowed(48);
}

@Override
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount,
Map<SegmentCommitInfo, Boolean> segmentsToMerge, MergeContext mergeContext) throws IOException {
return forcedMergePolicy.findForcedMerges(infos, maxSegmentCount, segmentsToMerge, mergeContext);
MergeSpecification spec = forcedMergePolicy.findForcedMerges(infos, maxSegmentCount, segmentsToMerge, mergeContext);
if (spec.merges.size() == 2) {
return forcedMergePolicy.findForcedMerges(infos, maxSegmentCount, segmentsToMerge, mergeContext);
}
return spec;
}

@Override
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
MergeSpecification spec = super.findMerges(mergeTrigger, segmentInfos, mergeContext);
if (spec != null && spec.merges !=null && spec.merges.size() == 2){
System.out.println("");
}
return spec;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
OnGoingMerge onGoingMerge = new OnGoingMerge(merge);
onGoingMerges.add(onGoingMerge);

if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
logger.error("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));

try {
beforeMerge(onGoingMerge);
super.doMerge(writer, merge);
Expand Down Expand Up @@ -134,9 +133,9 @@ protected void doMerge(IndexWriter writer, MergePolicy.OneMerge merge) throws IO
mbPerSec);

if (tookMS > 20000) { // if more than 20 seconds, DEBUG log it
logger.debug("{}", message);
logger.error("{}", message);
} else if (logger.isTraceEnabled()) {
logger.trace("{}", message);
logger.error("{}", message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,28 +1361,34 @@ public void writeIndexingBuffer() throws EngineException {
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
if (mergeScheduler.onGoingMerges().size() > 0) {
logger.error("ongoing merges");
}

if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
logger.error("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
if (indexWriter.hasUncommittedChanges()) {
logger.error("can't sync commit [{}]. have pending changes, merges {}", syncId, mergeScheduler.onGoingMerges().size());
indexWriter.hasUncommittedChanges();
return SyncedFlushResult.PENDING_OPERATIONS;
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
// lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
// or we also have uncommitted changes and that causes this syncFlush to fail.
refresh("sync_flush", SearcherScope.INTERNAL);
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
if (indexWriter.hasUncommittedChanges()) {
logger.error("can't sync commit [{}]. have pending changes", syncId);
indexWriter.hasUncommittedChanges();
return SyncedFlushResult.PENDING_OPERATIONS;
}
logger.trace("starting sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public CompletionStats completionStats(String... fields) {

public Engine.SyncedFlushResult syncFlush(String syncId, Engine.CommitId expectedCommitId) {
verifyNotClosed();
logger.trace("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
logger.error("trying to sync flush. sync id [{}]. expected commit id [{}]]", syncId, expectedCommitId);
Engine engine = getEngine();
if (engine.isRecovering()) {
throw new IllegalIndexShardStateException(shardId(), state, "syncFlush is only allowed if the engine is not recovery" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState s
final ActionListener<Map<String, PreSyncedFlushResponse>> presyncListener = new ActionListener<Map<String, PreSyncedFlushResponse>>() {
@Override
public void onResponse(final Map<String, PreSyncedFlushResponse> presyncResponses) {
logger.error("presyncResponses {}", presyncResponses.size());
if (presyncResponses.isEmpty()) {
actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "all shards failed to commit on pre-sync"));
return;
Expand All @@ -222,6 +223,7 @@ public void onResponse(InFlightOpsResponse response) {
// 3. now send the sync request to all the shards;
final String sharedSyncId = sharedExistingSyncId(presyncResponses);
if (sharedSyncId != null) {
logger.error("reusing sharedSyncId {}", shardId);
assert presyncResponses.values().stream().allMatch(r -> r.existingSyncId.equals(sharedSyncId)) :
"Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]";
reportSuccessWithExistingSyncId(shardId, sharedSyncId, activeShards, totalShards, presyncResponses, actionListener);
Expand Down Expand Up @@ -309,7 +311,7 @@ protected void getInflightOpsCount(final ShardId shardId, ClusterState state, In
listener.onResponse(new InFlightOpsResponse(-1));
return;
}
logger.trace("{} retrieving in flight operation count", shardId);
logger.error("{} retrieving in flight operation count", shardId);
transportService.sendRequest(primaryNode, IN_FLIGHT_OPS_ACTION_NAME, new InFlightOpsRequest(shardId),
new TransportResponseHandler<InFlightOpsResponse>() {
@Override
Expand All @@ -319,12 +321,13 @@ public InFlightOpsResponse newInstance() {

@Override
public void handleResponse(InFlightOpsResponse response) {
logger.error("{} receive in-flight count {}", shardId, response.opCount);
listener.onResponse(response);
}

@Override
public void handleException(TransportException exp) {
logger.debug("{} unexpected error while retrieving in flight op count", shardId);
logger.error("{} unexpected error while retrieving in flight op count", shardId);
listener.onFailure(exp);
}

Expand Down Expand Up @@ -358,14 +361,14 @@ void sendSyncRequests(final String syncId, final List<ShardRouting> shards, Clus
for (final ShardRouting shard : shards) {
final DiscoveryNode node = state.nodes().get(shard.currentNodeId());
if (node == null) {
logger.trace("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
logger.error("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new ShardSyncedFlushResponse("unknown node"));
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
}
final PreSyncedFlushResponse preSyncedResponse = preSyncResponses.get(shard.currentNodeId());
if (preSyncedResponse == null) {
logger.trace("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
logger.error("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}", shardId, syncId, shard);
results.put(shard, new ShardSyncedFlushResponse("no commit id from pre-sync flush"));
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
Expand All @@ -379,7 +382,7 @@ void sendSyncRequests(final String syncId, final List<ShardRouting> shards, Clus
countDownAndSendResponseIfDone(syncId, shards, shardId, totalShards, listener, countDown, results);
continue;
}
logger.trace("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
logger.error("{} sending synced flush request to {}. sync id [{}].", shardId, shard, syncId);
transportService.sendRequest(node, SYNCED_FLUSH_ACTION_NAME, new ShardSyncedFlushRequest(shard.shardId(), syncId, preSyncedResponse.commitId),
new TransportResponseHandler<ShardSyncedFlushResponse>() {
@Override
Expand All @@ -389,6 +392,7 @@ public ShardSyncedFlushResponse newInstance() {

@Override
public void handleResponse(ShardSyncedFlushResponse response) {
logger.error("Receive ShardSyncedFlushResponse {} /{} ", response.success(), response.failureReason);
ShardSyncedFlushResponse existing = results.put(shard, response);
assert existing == null : "got two answers for node [" + node + "]";
// count after the assert so we won't decrement twice in handleException
Expand Down Expand Up @@ -481,9 +485,9 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
logger.trace("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
logger.error("{} performing sync flush. sync id [{}], expected commit id {}", request.shardId(), request.syncId(), request.expectedCommitId());
Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(), request.expectedCommitId());
logger.trace("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
logger.error("{} sync flush done. sync id [{}], result [{}]", request.shardId(), request.syncId(), result);
switch (result) {
case SUCCESS:
return new ShardSyncedFlushResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
}
}

logger.error("--------------------------- recommit -------------------------");

final ShardsSyncedFlushResult fullResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
assertBusy(() -> {
for (IndexShard shard : indexShards) {
Expand Down

0 comments on commit 78c1b39

Please sign in to comment.