Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #262]Dledger polish project #263

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions command/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dledger-all</artifactId>
<groupId>io.openmessaging.storage</groupId>
Expand All @@ -28,7 +29,7 @@

<dependencies>
<dependency>
<groupId>io.openmessaging.storage</groupId>
<groupId>${project.groupId}</groupId>
<artifactId>dledger-proxy</artifactId>
</dependency>
<dependency>
Expand All @@ -44,12 +45,44 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>io.openmessaging.storage.dledger.command.BossCommand</mainClass>
</manifest>
</archive>
<finalName>DLedger-client</finalName>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2022 The DLedger Authors.
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -22,17 +22,17 @@
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.ReadFileRequest;
import io.openmessaging.storage.dledger.protocol.ReadFileResponse;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -132,6 +132,28 @@ public LeadershipTransferResponse leadershipTransfer(String curLeaderId, String
}
}

public ReadFileResponse readFile(final String dataFileDir, final long pos, final int fileSize, final long index,
final boolean readBodyOnly) {

try {
ReadFileRequest request = new ReadFileRequest();
request.setGroup(group);
request.setRemoteId(leaderId);
request.setDataFileDir(dataFileDir);
request.setPos(pos);
request.setFileSize(fileSize);
request.setIndex(index);
request.setReadBodyOnly(readBodyOnly);

return dLedgerClientRpcService.readFile(request).get();
} catch (Exception e) {
needFreshMetadata();
LOGGER.error("Read File from remote {} error", leaderId, e);
return new ReadFileResponse().code(DLedgerResponseCode.INTERNAL_ERROR.getCode());
}

}

public void startup() {
this.dLedgerClientRpcService.startup();
this.metadataUpdater.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2022 The DLedger Authors.
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -22,10 +22,12 @@
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.protocol.ReadFileRequest;
import io.openmessaging.storage.dledger.protocol.ReadFileResponse;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand Down Expand Up @@ -66,6 +68,15 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(Leadersh
return CompletableFuture.completedFuture(response);
}

@Override
public CompletableFuture<ReadFileResponse> readFile(ReadFileRequest request) throws Exception {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.READ_FILE.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
RemotingCommand wrapperResponse = this.remotingClient.invokeSync(getPeerAddr(request.getRemoteId()), wrapperRequest, 10000);
ReadFileResponse response = JSON.parseObject(wrapperResponse.getBody(), ReadFileResponse.class);
return CompletableFuture.completedFuture(response);
}

@Override
public CompletableFuture<GetEntriesResponse> get(GetEntriesRequest request) throws Exception {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.GET.getCode(), null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2022 The DLedger Authors.
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@

package io.openmessaging.storage.dledger.client;


import io.openmessaging.storage.dledger.protocol.DLedgerClientProtocol;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import com.alibaba.fastjson.JSON;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppendCommand extends BaseCommand {
@Parameters(commandDescription = "Append data to DLedger server")
public class AppendCommand implements BaseCommand {

private static Logger logger = LoggerFactory.getLogger(AppendCommand.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright 2017-2022 The DLedger Authors.
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,7 +16,8 @@

package io.openmessaging.storage.dledger.command;

public abstract class BaseCommand {
public interface BaseCommand {

public abstract void doCommand();
default void doCommand() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.openmessaging.storage.dledger.command;

import com.beust.jcommander.JCommander;

import java.util.HashMap;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Get data from DLedger server")
public class GetCommand extends BaseCommand {
public class GetCommand implements BaseCommand {

private static Logger logger = LoggerFactory.getLogger(GetCommand.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Leadership transfer")
public class LeadershipTransferCommand extends BaseCommand {
public class LeadershipTransferCommand implements BaseCommand {

private static final Logger LOGGER = LoggerFactory.getLogger(LeadershipTransferCommand.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,28 @@
import com.alibaba.fastjson.JSON;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.entry.DLedgerEntryCoder;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import java.nio.ByteBuffer;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.ReadFileResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Read data from DLedger server data file")
public class ReadFileCommand extends BaseCommand {
public class ReadFileCommand implements BaseCommand {

private static Logger logger = LoggerFactory.getLogger(ReadFileCommand.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ReadFileCommand.class);

@Parameter(names = {"--group", "-g"}, description = "Group of this server")
private String group = "default";

@Parameter(names = {"--peers", "-p"}, description = "Peer info of this server")
private String peers = "n0-localhost:20911";

@Parameter(names = {"--dir", "-d"}, description = "the data dir")
private String dataDir = null;

@Parameter(names = {"--pos", "-p"}, description = "the start pos")
@Parameter(names = {"--pos", "-o"}, description = "the start position")
private long pos = 0;

@Parameter(names = {"--size", "-s"}, description = "the file size")
Expand All @@ -51,30 +54,16 @@ public class ReadFileCommand extends BaseCommand {

@Override
public void doCommand() {
if (index != -1) {
pos = index * DLedgerMmapFileStore.INDEX_UNIT_SIZE;
if (size == -1) {
size = DLedgerMmapFileStore.INDEX_UNIT_SIZE * 1024 * 1024;
}
} else {
if (size == -1) {
size = 1024 * 1024 * 1024;
}
}
MmapFileList mmapFileList = new MmapFileList(dataDir, size);
mmapFileList.load();
MmapFile mmapFile = mmapFileList.findMappedFileByOffset(pos);
if (mmapFile == null) {
logger.info("Cannot find the file");
DLedgerClient dLedgerClient = new DLedgerClient(group, peers);
dLedgerClient.startup();
ReadFileResponse response = dLedgerClient.readFile(dataDir, pos, size, index, readBody);
if (null == response || response.getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
LOGGER.warn(JSON.toJSONString(response));
return;
}
SelectMmapBufferResult result = mmapFile.selectMappedBuffer((int) (pos % size));
ByteBuffer buffer = result.getByteBuffer();
if (index != -1) {
logger.info("magic={} pos={} size={} index={} term={}", buffer.getInt(), buffer.getLong(), buffer.getInt(), buffer.getLong(), buffer.getLong());
} else {
DLedgerEntry entry = DLedgerEntryCoder.decode(buffer, readBody);
logger.info(JSON.toJSONString(entry));
}
DLedgerEntry entry = response.getDLedgerEntry();
LOGGER.info(JSON.toJSONString(entry));

dLedgerClient.shutdown();
}
}
Loading