Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #736] pipeline execution of hash partitioned join #746

Merged
merged 49 commits into from
Oct 15, 2024
Merged
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6ed2a94
Add back WIP files
jasha64 Jun 16, 2024
2412101
Logging
jasha64 Jul 12, 2024
cc24104
Code optimization, reduce overheads by reduce calling ByteBuf.slice()
jasha64 Jul 12, 2024
e7f4e4a
Update filename list in stream worker unit tests, since I have pre-st…
jasha64 Aug 8, 2024
9a8504e
Get rid of FTP and write back to minio instead
jasha64 Aug 8, 2024
143586d
Revert -- writing to minio on shutdown gives an exception "Connection…
jasha64 Aug 9, 2024
3d1a716
stream invokers
jasha64 Aug 11, 2024
d483d38
stream operators - no need to wait for prev
jasha64 Aug 11, 2024
36817fa
Change unit test filename list
jasha64 Aug 11, 2024
e0f1791
Change unit test filename list
jasha64 Aug 11, 2024
7a608df
pixels planner HTTP storage info (dev version)
jasha64 Aug 11, 2024
17e4122
Merge branch 'pixelsdb:master' into dev2
jasha64 Aug 16, 2024
f49200c
Fix bug in aggregation stream operator
jasha64 Aug 30, 2024
39d28c9
Merge branch 'dev2' of github.com:jasha64/pixels into dev2
jasha64 Aug 30, 2024
3894f91
fix issue #681
huasiy Sep 4, 2024
b923a79
fix issue #681
huasiy Sep 4, 2024
cf6986f
Fix worker coordinate server unit test
jasha64 Sep 26, 2024
336985d
Merge branch 'hsy-dev' of github.com:jasha64/pixels into hsy-dev
jasha64 Sep 26, 2024
9292bd6
Worker coordination WIP
jasha64 Sep 29, 2024
c45313c
Using worker coordinate server, we do not need to manually map file n…
jasha64 Oct 1, 2024
22bdcc6
Modify pixels stream reader and writer impl. We now use URIs directly…
jasha64 Oct 4, 2024
2229970
Stream operator implemented - change to push-based model
jasha64 Oct 4, 2024
525804b
pixels-worker-vhive README
jasha64 Oct 4, 2024
5bae56b
Stream workers
jasha64 Oct 4, 2024
4734449
Clean up dependency and import
jasha64 Oct 6, 2024
862a575
Clean up dependency and import
jasha64 Oct 6, 2024
ab44e51
Correct copyright year
jasha64 Oct 6, 2024
79eb1e6
Revert dev purpose changes in install.sh
jasha64 Oct 6, 2024
156aaa4
Stylize, and cleanup commented out code
jasha64 Oct 12, 2024
9874245
Remove WIP files for merging
jasha64 Oct 12, 2024
8d6276e
Remove WIP files for merging
jasha64 Oct 12, 2024
3d89dc7
Merge branch 'master' into hsy-dev
jasha64 Oct 12, 2024
22f66e6
Stylize
jasha64 Oct 12, 2024
8fed574
Correct file header
jasha64 Oct 12, 2024
dc884bb
Comments
jasha64 Oct 12, 2024
c5caf2e
Comments
jasha64 Oct 12, 2024
86c2fd0
Refactor, move base stream workers to pixels-worker-common
jasha64 Oct 12, 2024
4ef5a5c
Fix conflict
jasha64 Oct 12, 2024
8ad18f3
Storage scheme for streaming mode
jasha64 Oct 12, 2024
e26a5c3
Remove WIP files for merging
jasha64 Oct 12, 2024
fda48a0
Comments
jasha64 Oct 13, 2024
b68d8a6
Tidy up constants
jasha64 Oct 14, 2024
c7532f3
Clean up commented out code in PixelsRecordReaderStreamImpl.close()
jasha64 Oct 14, 2024
c19a553
Revert additional logs in BaseAggregationWorker, because aggregation …
jasha64 Oct 14, 2024
777cc9c
Clean up commented out code
jasha64 Oct 14, 2024
145c4d1
Stylize
jasha64 Oct 14, 2024
22d2670
Correct file header
jasha64 Oct 14, 2024
f6d49ab
Update docs
jasha64 Oct 14, 2024
efb0945
Tidy up
jasha64 Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Using worker coordinate server, we do not need to manually map file n…
…ame to port number anymore
  • Loading branch information
jasha64 committed Oct 1, 2024
commit c45313c2c3d1720dcaec554df9479390a7c8d577
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@
import java.util.TimeZone;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static io.netty.handler.codec.http.HttpHeaderNames.*;
@@ -146,9 +147,9 @@ public class PixelsWriterStreamImpl implements PixelsWriter
* On the other hand, In partitioned mode, we send at most one row group to each upper-level worker (for now), and
* so we do not need to translate fileName to URI at construction time.
*/
private java.net.URI uri;
private URI uri;
private final String fileName;
private final List<String> fileNames;
private final List<URI> uris;

private final AsyncHttpClient httpClient;
/**
@@ -246,7 +247,7 @@ private PixelsWriterStreamImpl(TypeDescription schema, int pixelStride, int rowG
this.byteBuf = Unpooled.directBuffer();
this.uri = uri;
this.fileName = fileName;
this.fileNames = fileNames;
this.uris = fileNames.stream().map(URI::create).collect(Collectors.toList());
this.httpClient = Dsl.asyncHttpClient();
}

@@ -560,7 +561,7 @@ else if (isFirstRowGroup)
uri = URI.create(fileNameToUri(fileName));
}
Request req = httpClient
.preparePost(partitioned ? fileNameToUri(fileNames.get(currHashValue)) : uri.toString())
.preparePost(partitioned ? uris.get(currHashValue).toString() : uri.toString())
.addHeader(CONTENT_TYPE, "application/x-protobuf")
.addHeader(CONTENT_LENGTH, 0)
.addHeader(CONNECTION, CLOSE)
@@ -754,8 +755,8 @@ private void writeRowGroup() throws IOException
uri = URI.create(fileNameToUri(fileName));
}
logger.debug("Sending row group with length: " + byteBuf.writerIndex() +
" to endpoint: " + (partitioned ? fileNameToUri(fileNames.get(currHashValue)) : uri.toString()));
Request req = httpClient.preparePost(partitioned ? fileNameToUri(fileNames.get(currHashValue)) : uri.toString())
" to endpoint: " + (partitioned ? uris.get(currHashValue) : uri.toString()));
Request req = httpClient.preparePost(partitioned ? uris.get(currHashValue).toString() : uri.toString())
.setBody(byteBuf.nioBuffer())
.addHeader("X-Partition-Id", String.valueOf(partitionId))
.addHeader(CONTENT_TYPE, "application/x-protobuf")
Original file line number Diff line number Diff line change
@@ -139,22 +139,21 @@ public static void getSchemaFromPaths(ExecutorService executor,
Storage leftStorage, Storage rightStorage,
AtomicReference<TypeDescription> leftSchema,
AtomicReference<TypeDescription> rightSchema,
List<String> leftPaths, List<String> rightPaths)
List<String> leftEndpoints, List<String> rightEndpoints)
{
requireNonNull(executor, "executor is null");
requireNonNull(leftSchema, "leftSchema is null");
requireNonNull(rightSchema, "rightSchema is null");
requireNonNull(leftPaths, "leftPaths is null");
requireNonNull(rightPaths, "rightPaths is null");
requireNonNull(leftEndpoints, "leftPaths is null");
requireNonNull(rightEndpoints, "rightPaths is null");
if (leftStorage == http && rightStorage == http)
{
// streaming mode
// Currently, the first packet from the stream brings the schema
Future<?> leftFuture = executor.submit(() -> {
try
{
PixelsReader pixelsReader = new PixelsReaderStreamImpl(
PixelsWriterStreamImpl.getSchemaPort(leftPaths.get(0)));
PixelsReader pixelsReader = new PixelsReaderStreamImpl(leftEndpoints.get(0));
leftSchema.set(pixelsReader.getFileSchema());
pixelsReader.close();
} catch (Exception e)
@@ -165,8 +164,7 @@ public static void getSchemaFromPaths(ExecutorService executor,
Future<?> rightFuture = executor.submit(() -> {
try
{
PixelsReader pixelsReader = new PixelsReaderStreamImpl(
PixelsWriterStreamImpl.getSchemaPort(rightPaths.get(0)));
PixelsReader pixelsReader = new PixelsReaderStreamImpl(rightEndpoints.get(0));
rightSchema.set(pixelsReader.getFileSchema());
pixelsReader.close();
// XXX: This `close()` makes the test noticeably slower. Will need to look into it.
@@ -184,8 +182,8 @@ public static void getSchemaFromPaths(ExecutorService executor,
logger.error("interrupted while waiting for the termination of schema read", e);
}
} else
WorkerCommon.getFileSchemaFromPaths(executor, leftStorage, rightStorage, leftSchema, rightSchema, leftPaths,
rightPaths);
WorkerCommon.getFileSchemaFromPaths(executor, leftStorage, rightStorage, leftSchema, rightSchema,
leftEndpoints, rightEndpoints);
}

public static PixelsReader getReader(String filePath, Storage storage) throws UnsupportedOperationException
@@ -198,19 +196,17 @@ public static PixelsReader getReader(Storage.Scheme storageScheme, String path)
return getReader(storageScheme, path, false, -1);
}

public static PixelsReader getReader(Storage.Scheme storageScheme, String path, boolean partitioned,
public static PixelsReader getReader(Storage.Scheme storageScheme, String endpoint, boolean partitioned,
int numPartitions) throws Exception
{
requireNonNull(storageScheme, "storageInfo is null");
requireNonNull(path, "fileName is null");
requireNonNull(endpoint, "fileName is null");
if (storageScheme == Storage.Scheme.mock)
{
logger.debug("getReader streaming mode, path: " + path +
", port: " + PixelsWriterStreamImpl.getOrSetPort(path));
return new PixelsReaderStreamImpl("http://localhost:" + PixelsWriterStreamImpl.getOrSetPort(path) + "/",
partitioned, numPartitions);
logger.debug("getReader streaming mode: " + endpoint);
return new PixelsReaderStreamImpl(endpoint, partitioned, numPartitions);
}
else return WorkerCommon.getReader(path, WorkerCommon.getStorage(storageScheme));
else return WorkerCommon.getReader(endpoint, WorkerCommon.getStorage(storageScheme));
}

public static PixelsWriter getWriter(TypeDescription schema, Storage storage,
@@ -230,17 +226,17 @@ public static PixelsWriter getWriter(TypeDescription schema, Storage storage,
String outputPath, boolean encoding,
boolean isPartitioned, int partitionId,
List<Integer> keyColumnIds,
List<String> outputPaths, boolean isSchemaWriter)
List<String> outputEndpoints, boolean isSchemaWriter)
{
if (storage != null && storage.getScheme() != Storage.Scheme.mock)
return WorkerCommon.getWriter(schema, storage, outputPath, encoding, isPartitioned, keyColumnIds);
logger.debug("getWriter streaming mode, path: " + outputPath + ", paths: " + outputPaths +
logger.debug("getWriter streaming mode, path: " + outputPath + ", endpoints: " + outputEndpoints +
", isSchemaWriter: " + isSchemaWriter);
requireNonNull(schema, "schema is null");
requireNonNull(outputPath, "fileName is null");
checkArgument(!isPartitioned || keyColumnIds != null,
"keyColumnIds is null whereas isPartitioned is true");
checkArgument(!isPartitioned || outputPaths != null,
checkArgument(!isPartitioned || outputEndpoints != null,
"outputPaths is null whereas isPartitioned is true");

PixelsWriterStreamImpl.Builder builder = PixelsWriterStreamImpl.newBuilder();
@@ -252,13 +248,11 @@ public static PixelsWriter getWriter(TypeDescription schema, Storage storage,
.setPartitionId(isPartitioned ? partitionId : -1);
if (!isPartitioned)
{
if (isSchemaWriter)
builder.setUri(URI.create("http://localhost:" + PixelsWriterStreamImpl.getSchemaPort(outputPath) + "/"));
builder.setFileName(outputPath);
builder.setUri(URI.create(outputPath));
}
else
{
builder.setFileNames(outputPaths)
builder.setFileNames(outputEndpoints)
.setPartKeyColumnIds(keyColumnIds);
}
return builder.build();