Skip to content

Commit

Permalink
rename proto to make semantic clear
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Oct 24, 2023
1 parent b945db7 commit 7658a84
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 60 deletions.
13 changes: 11 additions & 2 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,17 @@ message LoadFileRequest {

// A subtask of a load file request. either a load data or load metadata.
message LoadSubTask {
optional Block block = 1;
optional UfsStatus ufs_status = 2;
optional LoadDataSubTask load_data_subtask = 1;
optional UfsStatus load_metadata_subtask = 2;
}

message LoadDataSubTask {
required int64 length = 2;
optional string ufs_path = 3;
// The offset of the block in within ufs the file.
optional int64 offset_in_file = 4;
optional UfsStatus ufs_status = 6;
optional WorkerNetAddress main_worker = 7;
}

message File{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ public boolean processResponse(DoraLoadTask doraLoadTask) {
LOG.warn(format("[DistributedLoad] Get failure from worker:%s, failed files:%s",
doraLoadTask.getMyRunningWorker(), response.getFailuresList()));
for (LoadFailure failure : response.getFailuresList()) {
if (failure.getSubtask().hasBlock()) {
totalLoadedBytes -= failure.getSubtask().getBlock().getLength();
if (failure.getSubtask().hasLoadDataSubtask()) {
totalLoadedBytes -= failure.getSubtask().getLoadDataSubtask().getLength();
}
String status = Status.fromCodeValue(failure.getCode()).toString();
LoadSubTask subTask = LoadSubTask.from(failure, mVirtualBlockSize);
Expand All @@ -518,8 +518,8 @@ public boolean processResponse(DoraLoadTask doraLoadTask) {
}
int totalLoadedInodes = doraLoadTask.getSubTasks().stream()
.filter(LoadSubTask::isLoadMetadata).collect(Collectors.toList()).size()
- response.getFailuresList().stream().filter(i -> i.getSubtask().hasUfsStatus()).collect(
Collectors.toList()).size();
- response.getFailuresList().stream().filter(i -> i.getSubtask().hasLoadMetadataSubtask())
.collect(Collectors.toList()).size();
if (!mLoadMetadataOnly) {
addLoadedBytes(totalLoadedBytes - response.getBytesSkipped());
LOAD_FILE_SIZE.inc(totalLoadedBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package alluxio.master.job;

import alluxio.grpc.Block;
import alluxio.underfs.UfsStatus;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -50,10 +49,11 @@ boolean isLoadMetadata() {

@Override
alluxio.grpc.LoadSubTask toProto() {
Block block =
Block.newBuilder().setOffsetInFile(mOffset).setUfsPath(getUfsPath()).setLength(getLength())
.setUfsStatus(mUfsStatus.toProto()).build();
return alluxio.grpc.LoadSubTask.newBuilder().setBlock(block).build();
alluxio.grpc.LoadDataSubTask block =
alluxio.grpc.LoadDataSubTask.newBuilder().setOffsetInFile(mOffset).setUfsPath(getUfsPath())
.setLength(getLength()).setUfsStatus(mUfsStatus.toProto())
.build();
return alluxio.grpc.LoadSubTask.newBuilder().setLoadDataSubtask(block).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ boolean isLoadMetadata() {

@Override
alluxio.grpc.LoadSubTask toProto() {
return alluxio.grpc.LoadSubTask.newBuilder().setUfsStatus(mUfsStatus.toProto()).build();
return alluxio.grpc.LoadSubTask.newBuilder().setLoadMetadataSubtask(mUfsStatus.toProto())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ public String getUfsPath() {
*/
public static LoadSubTask from(LoadFailure loadFailure, long virtualBlockSize) {
alluxio.grpc.LoadSubTask failure = loadFailure.getSubtask();
if (failure.hasUfsStatus()) {
return new LoadMetadataSubTask(UfsStatus.fromProto(failure.getUfsStatus()), virtualBlockSize);
if (failure.hasLoadMetadataSubtask()) {
return new LoadMetadataSubTask(UfsStatus.fromProto(failure.getLoadMetadataSubtask()),
virtualBlockSize);
}
else {
UfsStatus status = UfsStatus.fromProto(failure.getBlock().getUfsStatus());
return new LoadDataSubTask(status, virtualBlockSize, failure.getBlock().getOffsetInFile(),
failure.getBlock().getLength());
UfsStatus status = UfsStatus.fromProto(failure.getLoadDataSubtask().getUfsStatus());
return new LoadDataSubTask(status, virtualBlockSize,
failure.getLoadDataSubtask().getOffsetInFile(), failure.getLoadDataSubtask().getLength());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import alluxio.exception.runtime.FailedPreconditionRuntimeException;
import alluxio.exception.runtime.UnavailableRuntimeException;
import alluxio.exception.status.FailedPreconditionException;
import alluxio.grpc.Block;
import alluxio.grpc.Command;
import alluxio.grpc.CommandType;
import alluxio.grpc.CompleteFilePOptions;
Expand All @@ -49,6 +48,7 @@
import alluxio.grpc.GrpcService;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.LoadDataSubTask;
import alluxio.grpc.LoadFailure;
import alluxio.grpc.LoadFileResponse;
import alluxio.grpc.LoadMetadataPType;
Expand Down Expand Up @@ -548,13 +548,12 @@ public BlockWriter createFileWriter(String fileId, String ufsPath)
return new PagedFileWriter(this, ufsPath, mCacheManager, fileId, mPageSize);
}

private boolean isAllPageCached(Block block) {
alluxio.grpc.UfsStatus status = block.getUfsStatus();
private boolean isAllPageCached(alluxio.grpc.UfsStatus status, long offset, long length) {
String fileId = new AlluxioURI(status.getUfsFullPath()).hash();
List<PageId> cachedPages = mCacheManager.getCachedPageIdsByFileId(fileId,
status.getUfsFileStatus().getContentLength());
int numOfPagesInBlock = (int) (block.getLength() / mPageSize);
for (long pageIndex = block.getOffsetInFile() / mPageSize; pageIndex < numOfPagesInBlock;
int numOfPagesInBlock = (int) (length / mPageSize);
for (long pageIndex = offset / mPageSize; pageIndex < numOfPagesInBlock;
pageIndex++) {
PageId pageId = new PageId(fileId, pageIndex);
if (!cachedPages.contains(pageId)) {
Expand All @@ -572,28 +571,29 @@ public ListenableFuture<LoadFileResponse> load(List<LoadSubTask> subTasks, boole
AtomicInteger numSkipped = new AtomicInteger();
AtomicLong skippedLength = new AtomicLong();
for (LoadSubTask task : subTasks) {
if (task.hasUfsStatus()) {
UfsStatus status = UfsStatus.fromProto(task.getUfsStatus());
loadStatus(status, errors);
if (task.hasLoadMetadataSubtask()) {
UfsStatus status = UfsStatus.fromProto(task.getLoadMetadataSubtask());
loadMetadata(status, errors);
}
if (task.hasBlock()) {
Block block = task.getBlock();
if (block.getLength() <= 0) {
if (task.hasLoadDataSubtask()) {
LoadDataSubTask subtask = task.getLoadDataSubtask();
if (subtask.getLength() <= 0) {
continue;
}
boolean countAsSkipped = skipIfExists && isAllPageCached(block);
boolean countAsSkipped = skipIfExists && isAllPageCached(subtask.getUfsStatus(),
subtask.getOffsetInFile(), subtask.getLength());
if (countAsSkipped) {
numSkipped.incrementAndGet();
skippedLength.addAndGet(block.getLength());
skippedLength.addAndGet(subtask.getLength());
continue;
}
try {
ListenableFuture<Void> loadFuture = submitLoadDataSubTask(block, options, errors);
ListenableFuture<Void> loadFuture = submitLoadDataSubTask(subtask, options, errors);
futures.add(loadFuture);
} catch (RejectedExecutionException ex) {
LOG.warn("Load task overloaded.");
errors.add(LoadFailure.newBuilder()
.setSubtask(LoadSubTask.newBuilder().setBlock(block).build())
errors.add(LoadFailure.newBuilder().setSubtask(
LoadSubTask.newBuilder().setLoadDataSubtask(subtask).build())
.setCode(Status.RESOURCE_EXHAUSTED.getCode().value())
.setRetryable(true).setMessage(ex.getMessage()).build());
}
Expand Down Expand Up @@ -657,20 +657,20 @@ public void cacheData(String ufsPath, long length, long pos, boolean isAsync)
}

private ListenableFuture<Void> submitLoadDataSubTask(
Block block, UfsReadOptions options, List<LoadFailure> errors) {
LoadDataSubTask subTask, UfsReadOptions options, List<LoadFailure> errors) {
ListenableFuture<Void> future =
Futures.submit(() -> {
try {
if (options.hasUser()) {
AuthenticatedClientUser.set(options.getUser());
}
long fileLength = block.getUfsStatus().getUfsFileStatus().getContentLength();
if (block.hasMainWorker()) {
WorkerNetAddress address = GrpcUtils.fromProto(block.getMainWorker());
long fileLength = subTask.getUfsStatus().getUfsFileStatus().getContentLength();
if (subTask.hasMainWorker()) {
WorkerNetAddress address = GrpcUtils.fromProto(subTask.getMainWorker());
if (mAddress != address) {
long chunkSize = mPageSize;
Protocol.OpenUfsBlockOptions openOptions =
Protocol.OpenUfsBlockOptions.newBuilder().setUfsPath(block.getUfsPath())
Protocol.OpenUfsBlockOptions.newBuilder().setUfsPath(subTask.getUfsPath())
.setMountId(0).setNoCache(false)
.setOffsetInFile(0)
.setBlockSize(fileLength).build();
Expand All @@ -679,21 +679,21 @@ private ListenableFuture<Void> submitLoadDataSubTask(
.setOpenUfsBlockOptions(openOptions)
.setChunkSize(chunkSize);
try (PositionReader reader = new NettyDataReader(mFsContext, address, builder)) {
loadDataFromRemote(block.getUfsPath(), block.getOffsetInFile(), block.getLength(),
reader, (int) chunkSize);
loadDataFromRemote(subTask.getUfsPath(), subTask.getOffsetInFile(),
subTask.getLength(), reader, (int) chunkSize);
}
}
}
else {
loadData(block.getUfsPath(), 0, block.getOffsetInFile(), block.getLength(),
loadData(subTask.getUfsPath(), 0, subTask.getOffsetInFile(), subTask.getLength(),
fileLength);
}
} catch (Throwable e) {
LOG.error("Loading {} failed", block, e);
LOG.error("Loading {} failed", subTask, e);
boolean permissionCheckSucceeded = !(e instanceof AccessControlException);
AlluxioRuntimeException t = AlluxioRuntimeException.from(e);
errors.add(LoadFailure.newBuilder()
.setSubtask(LoadSubTask.newBuilder().setBlock(block).build())
errors.add(LoadFailure.newBuilder().setSubtask(
LoadSubTask.newBuilder().setLoadDataSubtask(subTask).build())
.setCode(t.getStatus().getCode().value())
.setRetryable(permissionCheckSucceeded).setMessage(t.getMessage())
.build());
Expand All @@ -718,7 +718,7 @@ private ListenableFuture<Void> submitLoadDataSubTask(
* @param status the ufs status
* @param errors the errors
*/
private void loadStatus(UfsStatus status, List<LoadFailure> errors) {
private void loadMetadata(UfsStatus status, List<LoadFailure> errors) {
String ufsFullPath = status.getUfsFullPath().toString();
Map<String, String> xattrMap = null;
UnderFileSystem ufs = getUfsInstance(ufsFullPath);
Expand All @@ -731,9 +731,8 @@ private void loadStatus(UfsStatus status, List<LoadFailure> errors) {
} catch (Exception e) {
LOG.error("Failed to put file status to meta manager", e);
AlluxioRuntimeException t = AlluxioRuntimeException.from(e);

errors.add(LoadFailure.newBuilder().setSubtask(
LoadSubTask.newBuilder().setUfsStatus(status.toProto()).build())
errors.add(LoadFailure.newBuilder().setSubtask(LoadSubTask.newBuilder()
.setLoadMetadataSubtask(status.toProto()).build())
.setCode(t.getStatus().getCode().value()).setRetryable(true)
.setMessage(t.getMessage()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import alluxio.conf.PropertyKey;
import alluxio.exception.AccessControlException;
import alluxio.file.ReadTargetBuffer;
import alluxio.grpc.Block;
import alluxio.grpc.CompleteFilePOptions;
import alluxio.grpc.CreateDirectoryPOptions;
import alluxio.grpc.CreateFilePOptions;
Expand All @@ -38,6 +37,7 @@
import alluxio.grpc.FileSystemMasterCommonPOptions;
import alluxio.grpc.GetStatusPOptions;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.LoadDataSubTask;
import alluxio.grpc.LoadFileResponse;
import alluxio.grpc.LoadSubTask;
import alluxio.grpc.RenamePOptions;
Expand Down Expand Up @@ -145,13 +145,14 @@ public void testLoadDataWithOffsetLength() throws Exception {
UfsStatus ufsStatus = mWorker.getUfsInstance(ufsPath).getStatus(ufsPath);
ufsStatus.setUfsFullPath(new AlluxioURI(ufsPath));

Block block =
Block.newBuilder().setOffsetInFile(mPageSize).setLength(mPageSize * numCachedPages)
.setUfsPath(ufsPath).setUfsStatus(ufsStatus.toProto()).build();
ListenableFuture<LoadFileResponse> load =
mWorker.load(Collections.singletonList(LoadSubTask.newBuilder().setBlock(block).build()),
false, UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false)
.build());
LoadDataSubTask block = LoadDataSubTask.newBuilder().setOffsetInFile(mPageSize)
.setLength(mPageSize * numCachedPages)
.setUfsPath(ufsPath).setUfsStatus(ufsStatus.toProto())
.build();
ListenableFuture<LoadFileResponse> load = mWorker.load(
Collections.singletonList(LoadSubTask.newBuilder().setLoadDataSubtask(block).build()),
false,
UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build());
LoadFileResponse response = load.get(30, TimeUnit.SECONDS);
assertEquals(0, response.getFailuresCount());
List<PageId> cachedPages =
Expand All @@ -177,7 +178,7 @@ public void testLoadMetaDataOnly() throws Exception {
UfsStatus ufsStatus = mWorker.getUfsInstance(ufsPath).getStatus(ufsPath);
ufsStatus.setUfsFullPath(new AlluxioURI(ufsPath));
ListenableFuture<LoadFileResponse> load = mWorker.load(Collections.singletonList(
LoadSubTask.newBuilder().setUfsStatus(ufsStatus.toProto()).build()), false,
LoadSubTask.newBuilder().setLoadMetadataSubtask(ufsStatus.toProto()).build()), false,
UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build());
load.get(30, TimeUnit.SECONDS);
List<PageId> cachedPages =
Expand Down Expand Up @@ -899,12 +900,13 @@ private void loadFileData(String path)
UfsStatus ufsStatus = mWorker.getUfsInstance(path).getStatus(path);
ufsStatus.setUfsFullPath(new AlluxioURI(path));

Block block = Block.newBuilder().setLength(ufsStatus.asUfsFileStatus().getContentLength())
LoadDataSubTask block =
LoadDataSubTask.newBuilder().setLength(ufsStatus.asUfsFileStatus().getContentLength())
.setOffsetInFile(0).setUfsPath(ufsStatus.getUfsFullPath().toString())
.setUfsStatus(ufsStatus.toProto()).build();
ListenableFuture<LoadFileResponse> load = mWorker.load(
Arrays.asList(LoadSubTask.newBuilder().setUfsStatus(ufsStatus.toProto()).build(),
LoadSubTask.newBuilder().setBlock(block).build()), false,
Arrays.asList(LoadSubTask.newBuilder().setLoadMetadataSubtask(ufsStatus.toProto()).build(),
LoadSubTask.newBuilder().setLoadDataSubtask(block).build()), false,
UfsReadOptions.newBuilder().setUser("test").setTag("1").setPositionShort(false).build());
LoadFileResponse response = load.get(30, TimeUnit.SECONDS);
assertEquals(0, response.getFailuresCount());
Expand Down

0 comments on commit 7658a84

Please sign in to comment.