Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #176] Realize the ability to read from the replica node,Optimize read performance #179

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
Expand Down Expand Up @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
this(dLedgerServer, null, null, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.dLedgerServer = dLedgerServer;
this.memberState = dLedgerServer.getMemberState();
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
Expand Down Expand Up @@ -109,6 +113,7 @@ public boolean rejectRequest() {
this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.PULL_READ_INDEX.getCode(), protocolProcessor, null);

//start the remoting client
if (nettyClientConfig == null) {
Expand Down Expand Up @@ -252,9 +257,29 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw
return future;
}

@Override
public CompletableFuture<PullReadIndexResponse> pullReadIndex(PullReadIndexRequest request) throws Exception {
CompletableFuture<PullReadIndexResponse> future = new CompletableFuture<>();
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL_READ_INDEX.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
PullReadIndexResponse response;
if (null != responseCommand) {
response = JSON.parseObject(responseCommand.getBody(), PullReadIndexResponse.class);
} else {
response = new PullReadIndexResponse();
response.copyBaseInfo(request);
response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
}
future.complete(response);
});
return future;
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
try {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
Expand Down Expand Up @@ -283,7 +308,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
}

private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
ChannelHandlerContext ctx) {
ChannelHandlerContext ctx) {
RemotingCommand response = null;
try {
if (t != null) {
Expand Down Expand Up @@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
case METADATA: {
MetadataRequest metadataRequest = JSON.parseObject(request.getBody(), MetadataRequest.class);
CompletableFuture<MetadataResponse> future = handleMetadata(metadataRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case APPEND: {
AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class);
CompletableFuture<AppendEntryResponse> future = handleAppend(appendEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case GET: {
GetEntriesRequest getEntriesRequest = JSON.parseObject(request.getBody(), GetEntriesRequest.class);
CompletableFuture<GetEntriesResponse> future = handleGet(getEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PULL: {
PullEntriesRequest pullEntriesRequest = JSON.parseObject(request.getBody(), PullEntriesRequest.class);
CompletableFuture<PullEntriesResponse> future = handlePull(pullEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PUSH: {
PushEntryRequest pushEntryRequest = JSON.parseObject(request.getBody(), PushEntryRequest.class);
CompletableFuture<PushEntryResponse> future = handlePush(pushEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case VOTE: {
VoteRequest voteRequest = JSON.parseObject(request.getBody(), VoteRequest.class);
CompletableFuture<VoteResponse> future = handleVote(voteRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case HEART_BEAT: {
HeartBeatRequest heartBeatRequest = JSON.parseObject(request.getBody(), HeartBeatRequest.class);
CompletableFuture<HeartBeatResponse> future = handleHeartBeat(heartBeatRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case LEADERSHIP_TRANSFER: {
Expand All @@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
request, x, DLedgerUtils.elapsed(start));
request, x, DLedgerUtils.elapsed(start));
}, futureExecutor);
break;
}
case PULL_READ_INDEX: {
PullReadIndexRequest pullReadIndexRequest = JSON.parseObject(request.getBody(), PullReadIndexRequest.class);
CompletableFuture<PullReadIndexResponse> future = handlePullReadIndex(pullReadIndexRequest);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
default:
logger.error("Unknown request code {} from {}", request.getCode(), request);
break;
Expand All @@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
}

Expand Down Expand Up @@ -432,6 +449,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
return dLedgerServer.handlePush(request);
}

@Override
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
return dLedgerServer.handlePullReadIndex(request);
}

public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
remotingCommand.setBody(JSON.toJSONBytes(response));
Expand Down
111 changes: 100 additions & 11 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
Expand All @@ -44,19 +46,17 @@
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand Down Expand Up @@ -211,7 +211,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
DLedgerEntry resEntry = null;
// split bodys to append
// split bodies to append
int index = 0;
Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
while (iterator.hasNext()) {
Expand All @@ -226,8 +226,8 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
batchAppendFuture.setPositions(positions);
return batchAppendFuture;
}
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODIES, "BatchAppendEntryRequest" +
" with empty bodies");
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
Expand All @@ -246,16 +246,58 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
}

@Override
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex());
PreConditions.check(!memberState.isCandidate(), DLedgerResponseCode.IS_CANDIDATE);
GetEntriesResponse response = new GetEntriesResponse();
response.setGroup(memberState.getGroup());
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
Long requestIndex = request.getBeginIndex();
if (memberState.isFollower()) {
//Get from follower
if (requestIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

// when requestIndex greater than ledgerEndIndex then send pull readIndex(ledgerEndIndex) request to leader
PullReadIndexRequest indexRequest = new PullReadIndexRequest();
indexRequest.setGroup(request.getGroup());
indexRequest.setRemoteId(memberState.getLeaderId());
CompletableFuture<PullReadIndexResponse> future = dLedgerRpcService.pullReadIndex(indexRequest);
PullReadIndexResponse pullReadIndexResponse = future.get();
if (pullReadIndexResponse.getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(pullReadIndexResponse.getCode());
return CompletableFuture.completedFuture(response);
}

long readIndex = pullReadIndexResponse.getReadIndex();
if (requestIndex > readIndex) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(DLedgerResponseCode.INDEX_OUT_OF_RANGE.getCode());
return CompletableFuture.completedFuture(response);
}

if (readIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

//wait for follower ledgerEndIndex to update
if (!waitFollowerEndIndex2Update(2, TimeUnit.SECONDS, requestIndex)) {
logger.warn("update follower[{}] ledgerEndIndex time out", memberState.getSelfId());
response.setCode(DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode());
return CompletableFuture.completedFuture(response);
}
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
} else {
//get from leader
getEntry(response, requestIndex);
}
return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
Expand All @@ -268,6 +310,13 @@ public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request
}
}

private void getEntry(GetEntriesResponse response, Long requestIndex) {
DLedgerEntry entry = dLedgerStore.get(requestIndex);
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
}
}

@Override
public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
try {
Expand Down Expand Up @@ -311,6 +360,30 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)

}

@Override
public CompletableFuture<PullReadIndexResponse> handlePullReadIndex(PullReadIndexRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

PullReadIndexResponse response = new PullReadIndexResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getLeaderId());
response.setEndIndex(memberState.getLedgerEndIndex());
response.setReadIndex(memberState.getLedgerEndIndex());

return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
logger.error("[{}][HandlePullReadIndex] failed", memberState.getSelfId(), e);
PullReadIndexResponse response = new PullReadIndexResponse();
response.copyBaseInfo(request);
response.setCode(e.getCode().getCode());
response.setLeaderId(memberState.getLeaderId());
return CompletableFuture.completedFuture(response);
}
}

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
Expand Down Expand Up @@ -486,4 +559,20 @@ public NettyRemotingClient getRemotingClient() {
return null;
}

private boolean waitFollowerEndIndex2Update(long maxWaitTime, TimeUnit unit, long requestIndex) {
long maxWaitMs = unit.toMillis(maxWaitTime);
long start = System.currentTimeMillis();
while (DLedgerUtils.elapsed(start) < maxWaitMs) {
try {
if (requestIndex <= memberState.getLedgerEndIndex()) {
return true;
}
DLedgerUtils.sleep(1);
} catch (Exception e) {
logger.warn("Wait [{}]Follower update endIndex error",memberState.getSelfId(),e);
break;
}
}
return false;
}
}
Loading