-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update worker api support for load multi replicas #18296
Conversation
Automated checks report:
Some checks failed. Please fix the reported issues and reply 'alluxio-bot, check this please' to re-run checks. |
Automated checks report:
All checks passed! |
} | ||
long fileLength = block.getUfsStatus().getUfsFileStatus().getContentLength(); | ||
if (block.hasMainWorker()) { | ||
WorkerNetAddress address = GrpcUtils.fromProto(block.getMainWorker()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main worker is set from scheduler? if we r submitting load tasks to all replicas at same time, how to ensure the main worker would have loaded at the time of secondary worker trying to read from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we don't ensure that due to the concurrency issue of getAndLoad. So we would recommend to user to load one replica first and then set multiple replicas to ensure they only read the data from ufs once. If we want to further improve this part we can improve the getAndLoad in CacheManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
understood, fix getAndLoad to save only one read is out of scope on this PR, this PR targets at only save duplicate read from multiple workers not duplicate read from one worker.
@VisibleForTesting | ||
public void loadDataFromRemote(String filePath, long offset, long lengthToLoad, | ||
PositionReader reader, int chunkSize) throws IOException { | ||
ByteBuffer buf = ByteBuffer.allocate(chunkSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use PooledDirectNioByteBuf.allocate(chunkSize) as loadData does since most of it will be aligned to pagesize, and pass in NettyBufTargetBuffer type for reader.read(long position, ReadTargetBuffer buffer, int length) as this pool can manage a reuse of these mostly aligned buffer. otherwise this is allocate onheap will cause huge mem footprint and put heavylifting on GC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh u use ByteBuffer bcos of cachemanager only support bytebuffer, then its better to allocate direct buffer than heap, large buffer mem footprint on heap might cause program misbehavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use direct buffer
ByteBuffer buf = ByteBuffer.allocate(chunkSize); | ||
String fileId = new AlluxioURI(filePath).hash(); | ||
|
||
while (0 < lengthToLoad) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lengthToLoad > 0 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -608,6 +608,7 @@ message LoadJobPOptions { | |||
optional bool partialListing = 3; | |||
optional bool loadMetadataOnly = 4; | |||
optional bool skipIfExists = 5; | |||
optional int32 replicas = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im assuming this is not used in this PR right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be used in following PR
try (PositionReader reader = new NettyDataReader(mFsContext, address, builder)) { | ||
loadDataFromRemote(block.getUfsPath(), block.getOffsetInFile(), block.getLength(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this is worker-to-worker communication, but reusing client-side code. This can cause some metrics to go inaccurate as the worker being requested cannot know whether the peer is really a client or another worker.
Can we have a different request type other than Protocol.ReadRequest
so that the other worker can tell whether this is a normal read request from a true client, or from a peer worker for caching purposes? This allows splitting the code paths and reduce intertwined code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The NettyDataReader
should be moved into the common module, also it should be able to handle a generic *Request
that involves data transmission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would propose to add a field user
in the ReadRequest to indicate who is issuing the read so we can distinguish the reader(client or worker) cause iterally worker
is just another user to send the read request. And I would try to do the refactoring of NettyDataReader in a later PR so we can have limited scope in this PR
@@ -226,6 +226,7 @@ message Block{ | |||
optional int64 offset_in_file = 4; | |||
optional int64 mountId = 5; | |||
optional UfsStatus ufs_status = 6; | |||
optional WorkerNetAddress main_worker = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the main worker of a block defined? Is it a consistent relationship between a block and a main worker? What happens when the main worker is not available?
If the client wants to express the idea that "for this load job and this particular block, load from this worker," then I don't think we need to involve the concept of a main worker. Instead, you can define the LoadRequest
object as
message LoadRequest {
message BlockLoadRequest {
optional Block blockToLoad = 1;
optional WorkerNetAddress workerToLoadFrom = 2;
}
repeated BlockLoadRequest blocks = 1;
// ... other fields
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a new proto message loaddatasubtask
to reduce confusion.
7658a84
to
a0bfcac
Compare
optional Block block = 1; | ||
optional UfsStatus ufs_status = 2; | ||
optional LoadDataSubTask load_data_subtask = 1; | ||
optional UfsStatus load_metadata_subtask = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that sense its better to make a LoadMetadataSubTask in future refactor? but no need to address in current PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, avoid future incompatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
alluxio-bot, merge this please |
merge failed: |
alluxio-bot, merge this please |
### What changes are proposed in this pull request? Update worker api support for load multi replicas ### Why are the changes needed? part of PR to support load multi replicas ### Does this PR introduce any user facing changes? na pr-link: Alluxio#18296 change-id: cid-0213f2aba669b7687ac42cf932cdcec911d397a4
What changes are proposed in this pull request?
Update worker api support for load multi replicas
Why are the changes needed?
part of PR to support load multi replicas
Does this PR introduce any user facing changes?
na