Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #736] support streaming broadcast join #760

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

huasiy
Copy link
Collaborator

@huasiy huasiy commented Oct 23, 2024

Part of #736.

@huasiy
Copy link
Collaborator Author

huasiy commented Oct 23, 2024

image

@huasiy huasiy requested a review from bianhq October 23, 2024 03:00
@bianhq bianhq changed the title Support tpch Q14 [Issue #736] support tpch Q14 Oct 23, 2024
@bianhq bianhq changed the title [Issue #736] support tpch Q14 [Issue #736] support pipeline execution of broadcast join Oct 23, 2024
@bianhq bianhq changed the title [Issue #736] support pipeline execution of broadcast join [Issue #736] support streaming broadcast join Oct 23, 2024
}
else if (joinAlgo == JoinAlgorithm.BROADCAST_CHAIN)
{
// joinOutputs[i] = InvokerFactory.Instance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not support broadcast chain join?

CompletableFuture<CompletableFuture<? extends Output>[]> smallChildFuture = null;
if (smallChild != null)
{
throw new InterruptedException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not execute the child operator?

CompletableFuture<CompletableFuture<? extends Output>[]> largeChildFuture = null;
if (largeChild != null)
{
throw new InterruptedException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not execute the child operator?

if (smallChildFuture != null)
{
CompletableFuture<? extends Output>[] smallChildOutputs = smallChildFuture.join();
waitForCompletion(smallChildOutputs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streaming execution, it should not wait for the child operator to complete.

if (largeChildFuture != null)
{
CompletableFuture<? extends Output>[] largeChildOutputs = largeChildFuture.join();
waitForCompletion(largeChildOutputs, LargeSideCompletionRatio);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For streaming execution, it should not wait for the child operator to complete.

@@ -0,0 +1,56 @@
/*
* Copyright 2023 PixelsDB.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,31 @@
package io.pixelsdb.pixels.invoker.vhive;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add license header.

@Override
public CompletableFuture<Output> invoke(Input input)
{
// log.info(String.format("invoke BroadcastJoinStreamInput: %s", JSON.toJSONString(input, SerializerFeature.PrettyFormat, SerializerFeature.DisableCircularReferenceDetect)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this log if you do not need it.

String coordinatorIp = WorkerCommon.getCoordinatorIp();
int coordinatorPort = WorkerCommon.getCoordinatorPort();
CFWorkerInfo workerInfo = new CFWorkerInfo(ip, port, transId, stageId, "broadcast_join", Collections.emptyList());
workerCoordinatorService = new WorkerCoordinateService(coordinatorIp, coordinatorPort);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is workerCoordiantorService used anywhere?

@bianhq bianhq added the enhancement New feature or request label Oct 23, 2024
@huasiy huasiy force-pushed the broadcast_join_stream branch 4 times, most recently from 05c5e81 to 598bf72 Compare October 28, 2024 01:55
@huasiy huasiy force-pushed the broadcast_join_stream branch from 598bf72 to 119fe31 Compare October 28, 2024 02:52
@huasiy huasiy force-pushed the broadcast_join_stream branch from e0245f6 to 326949d Compare October 28, 2024 05:52
@huasiy huasiy force-pushed the broadcast_join_stream branch from 69805f9 to cf79158 Compare October 28, 2024 13:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants