Skip to content

Commit

Permalink
first attempt at tying into transport RefCounted use
Browse files Browse the repository at this point in the history
  • Loading branch information
masseyke committed Nov 22, 2023
1 parent 67e74e1 commit a38a243
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public final class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequ
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);

private final BulkItemRequest[] items;
private boolean closed = false;

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -167,23 +166,18 @@ public void incRef() {
public boolean tryIncRef() {
boolean incremented = false;
for (BulkItemRequest item : items) {
incremented = incremented || item.tryIncRef();
incremented = item.tryIncRef() || incremented;
}
return incremented;
}

@Override
public boolean decRef() {
if (closed == false) {
boolean decremented = false;
for (BulkItemRequest item : items) {
decremented = decremented || item.decRef();
}
closed = true;
return decremented;
} else {
return true;
boolean decremented = false;
for (BulkItemRequest item : items) {
decremented = item.decRef() || decremented;
}
return decremented;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ protected void dispatchedShardOperationOnPrimary(
ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
request.incRef();
performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
assert update != null;
assert shardId != null;
Expand All @@ -159,6 +160,7 @@ public void onTimeout(TimeValue timeout) {
mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update"));
}
}), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction);
request.decRef();
}

@Override
Expand Down Expand Up @@ -210,6 +212,7 @@ public static void performOnPrimary(
@Nullable PostWriteRefresh postWriteRefresh,
@Nullable Consumer<Runnable> postWriteAction
) {
request.incRef();
new ActionRunnable<>(listener) {

private final Executor executor = threadPool.executor(executorName);
Expand Down Expand Up @@ -287,6 +290,7 @@ private void finishRequest() {
postWriteAction
)
);
request.decRef();
}
}.run();
}
Expand Down Expand Up @@ -349,23 +353,26 @@ static boolean executeBulkItemRequest(
);
} else {
final IndexRequest request = context.getRequestToExecute();
final SourceToParse sourceToParse = new SourceToParse(
request.id(),
request.source(),
request.getContentType(),
request.routing(),
request.getDynamicTemplates(),
request.pipelinesHaveRun() == false
);
result = primary.applyIndexOperationOnPrimary(
version,
request.versionType(),
sourceToParse,
request.ifSeqNo(),
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
request.isRetry()
);
try (
SourceToParse sourceToParse = new SourceToParse(
request.id(),
request.source(),
request.getContentType(),
request.routing(),
request.getDynamicTemplates(),
request.pipelinesHaveRun() == false
)
) {
result = primary.applyIndexOperationOnPrimary(
version,
request.versionType(),
sourceToParse,
request.ifSeqNo(),
request.ifPrimaryTerm(),
request.getAutoGeneratedTimestamp(),
request.isRetry()
);
}
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
@Nullable
private String routing;

private ReleasableBytesReference source;
private BytesReference source;

private OpType opType = OpType.INDEX;

Expand Down Expand Up @@ -379,7 +379,7 @@ public boolean isPipelineResolved() {
/**
* The source of the document to index, recopied to a new array if it is unsafe.
*/
public ReleasableBytesReference source() {
public BytesReference source() {
return source;
}

Expand Down Expand Up @@ -490,7 +490,7 @@ public IndexRequest source(XContentType xContentType, Object... source) {
* Sets the document to index in bytes form.
*/
public IndexRequest source(BytesReference source, XContentType xContentType) {
this.source = ReleasableBytesReference.wrap(Objects.requireNonNull(source));
this.source = Objects.requireNonNull(source);
this.contentType = Objects.requireNonNull(xContentType);
return this;
}
Expand Down Expand Up @@ -879,22 +879,33 @@ public List<String> getExecutedPipelines() {

@Override
public void incRef() {
source.tryIncRef();
if (source instanceof ReleasableBytesReference releasableSource) {
releasableSource.tryIncRef();
}
}

@Override
public boolean tryIncRef() {
return source.tryIncRef();
if (source instanceof ReleasableBytesReference releasableSource) {
return releasableSource.tryIncRef();
}
return true;
}

@Override
public boolean decRef() {
return source.decRef();
if (source instanceof ReleasableBytesReference releasableSource) {
return releasableSource.decRef();
}
return true;
}

@Override
public boolean hasReferences() {
return source.hasReferences();
if (source instanceof ReleasableBytesReference releasableSource) {
return releasableSource.hasReferences();
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,18 @@ protected void doRun() throws Exception {
actualTerm
);
}

acquirePrimaryOperationPermit(
indexShard,
primaryRequest.getRequest(),
ActionListener.wrap(releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)), e -> {
if (e instanceof ShardNotInPrimaryModeException) {
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
} else {
onFailure(e);
}
})
);
// primaryRequest.incRef(); // TODO: I think we don't actually need this b/c we're still in the same thread here
acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(releasable -> {
runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable));
// primaryRequest.decRef();
}, e -> {
// primaryRequest.decRef();
if (e instanceof ShardNotInPrimaryModeException) {
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
} else {
onFailure(e);
}
}));
}

void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
Expand Down Expand Up @@ -592,9 +592,10 @@ protected void handleReplicaRequest(
final Task task
) {
Releasable releasable = checkReplicaLimits(replicaRequest.getRequest());
replicaRequest.incRef();
ActionListener<ReplicaResponse> listener = ActionListener.runBefore(new ChannelActionListener<>(channel), () -> {
releasable.close();
replicaRequest.close();
replicaRequest.decRef();
});
try {
new AsyncReplicaAction(replicaRequest, listener, (ReplicationTask) task).run();
Expand Down Expand Up @@ -1301,8 +1302,7 @@ public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, l
/** a wrapper class to encapsulate a request when being sent to a specific allocation id **/
public static class ConcreteShardRequest<R extends TransportRequest> extends TransportRequest
implements
RawIndexingDataTransportRequest,
Releasable {
RawIndexingDataTransportRequest {

/** {@link AllocationId#getId()} of the shard this request is sent to **/
private final String targetAllocationID;
Expand Down Expand Up @@ -1423,12 +1423,27 @@ public String toString() {
}

@Override
public void close() {
request.decRef();
public void incRef() {
request.incRef();
}

@Override
public boolean tryIncRef() {
return request.tryIncRef();
}

@Override
public boolean decRef() {
return request.decRef();
}

@Override
public boolean hasReferences() {
return request.hasReferences();
}
}

protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> implements Releasable {
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {

private final long globalCheckpoint;
private final long maxSeqNoOfUpdatesOrDeletes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,12 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<ReplicaRequest, Response>> listener
) {
request.incRef();
threadPool.executor(executorFunction.apply(executorSelector, primary)).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
dispatchedShardOperationOnPrimary(request, primary, listener);
request.decRef();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ParsedDocument implements Releasable {

private final List<LuceneDocument> documents;

private ReleasableBytesReference source;
private BytesReference source;
private XContentType xContentType;

private Mapping dynamicMappingsUpdate;
Expand All @@ -59,7 +59,7 @@ public static ParsedDocument noopTombstone(String reason) {
"",
null,
Collections.singletonList(document),
ReleasableBytesReference.wrap(new BytesArray("{}")),
new BytesArray("{}"),
XContentType.JSON,
null
);
Expand All @@ -83,7 +83,7 @@ public static ParsedDocument deleteTombstone(String id) {
id,
null,
Collections.singletonList(document),
ReleasableBytesReference.wrap(new BytesArray("{}")),
new BytesArray("{}"),
XContentType.JSON,
null
);
Expand All @@ -99,20 +99,9 @@ public ParsedDocument(
XContentType xContentType,
Mapping dynamicMappingsUpdate
) {
this(version, seqID, id, routing, documents, ReleasableBytesReference.wrap(source), xContentType, dynamicMappingsUpdate);
}

public ParsedDocument(
Field version,
SeqNoFieldMapper.SequenceIDFields seqID,
String id,
String routing,
List<LuceneDocument> documents,
ReleasableBytesReference source,
XContentType xContentType,
Mapping dynamicMappingsUpdate
) {
source.incRef();
if (source instanceof ReleasableBytesReference releasableSource) {
releasableSource.incRef();
}
this.version = version;
this.seqID = seqID;
this.id = id;
Expand Down Expand Up @@ -191,6 +180,8 @@ public String documentDescription() {

@Override
public void close() {
source.decRef();
if (source instanceof ReleasableBytesReference releasableSource) {
releasableSource.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SourceToParse implements Releasable {

private final ReleasableBytesReference source;
private final BytesReference source;

private final String id;

Expand All @@ -45,7 +45,7 @@ public SourceToParse(
if (source instanceof ReleasableBytesReference releasableSource) {
this.source = releasableSource.retain();
} else {
this.source = ReleasableBytesReference.wrap(source);
this.source = source;
}
this.xContentType = Objects.requireNonNull(xContentType);
this.routing = routing;
Expand All @@ -65,7 +65,7 @@ public boolean toBeReported() {
return toBeReported;
}

public ReleasableBytesReference source() {
public BytesReference source() {
return this.source;
}

Expand Down Expand Up @@ -101,6 +101,8 @@ public XContentType getXContentType() {

@Override
public void close() {
this.source.decRef();
if (source instanceof ReleasableBytesReference releasableSource) {
releasableSource.decRef();
}
}
}

0 comments on commit a38a243

Please sign in to comment.