Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #736] pipeline execution of hash partitioned join #746

Merged
merged 49 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6ed2a94
Add back WIP files
jasha64 Jun 16, 2024
2412101
Logging
jasha64 Jul 12, 2024
cc24104
Code optimization, reduce overheads by reduce calling ByteBuf.slice()
jasha64 Jul 12, 2024
e7f4e4a
Update filename list in stream worker unit tests, since I have pre-st…
jasha64 Aug 8, 2024
9a8504e
Get rid of FTP and write back to minio instead
jasha64 Aug 8, 2024
143586d
Revert -- writing to minio on shutdown gives an exception "Connection…
jasha64 Aug 9, 2024
3d1a716
stream invokers
jasha64 Aug 11, 2024
d483d38
stream operators - no need to wait for prev
jasha64 Aug 11, 2024
36817fa
Change unit test filename list
jasha64 Aug 11, 2024
e0f1791
Change unit test filename list
jasha64 Aug 11, 2024
7a608df
pixels planner HTTP storage info (dev version)
jasha64 Aug 11, 2024
17e4122
Merge branch 'pixelsdb:master' into dev2
jasha64 Aug 16, 2024
f49200c
Fix bug in aggregation stream operator
jasha64 Aug 30, 2024
39d28c9
Merge branch 'dev2' of github.com:jasha64/pixels into dev2
jasha64 Aug 30, 2024
3894f91
fix issue #681
huasiy Sep 4, 2024
b923a79
fix issue #681
huasiy Sep 4, 2024
cf6986f
Fix worker coordinate server unit test
jasha64 Sep 26, 2024
336985d
Merge branch 'hsy-dev' of github.com:jasha64/pixels into hsy-dev
jasha64 Sep 26, 2024
9292bd6
Worker coordination WIP
jasha64 Sep 29, 2024
c45313c
Using worker coordinate server, we do not need to manually map file n…
jasha64 Oct 1, 2024
22bdcc6
Modify pixels stream reader and writer impl. We now use URIs directly…
jasha64 Oct 4, 2024
2229970
Stream operator implemented - change to push-based model
jasha64 Oct 4, 2024
525804b
pixels-worker-vhive README
jasha64 Oct 4, 2024
5bae56b
Stream workers
jasha64 Oct 4, 2024
4734449
Clean up dependency and import
jasha64 Oct 6, 2024
862a575
Clean up dependency and import
jasha64 Oct 6, 2024
ab44e51
Correct copyright year
jasha64 Oct 6, 2024
79eb1e6
Revert dev purpose changes in install.sh
jasha64 Oct 6, 2024
156aaa4
Stylize, and cleanup commented out code
jasha64 Oct 12, 2024
9874245
Remove WIP files for merging
jasha64 Oct 12, 2024
8d6276e
Remove WIP files for merging
jasha64 Oct 12, 2024
3d89dc7
Merge branch 'master' into hsy-dev
jasha64 Oct 12, 2024
22f66e6
Stylize
jasha64 Oct 12, 2024
8fed574
Correct file header
jasha64 Oct 12, 2024
dc884bb
Comments
jasha64 Oct 12, 2024
c5caf2e
Comments
jasha64 Oct 12, 2024
86c2fd0
Refactor, move base stream workers to pixels-worker-common
jasha64 Oct 12, 2024
4ef5a5c
Fix conflict
jasha64 Oct 12, 2024
8ad18f3
Storage scheme for streaming mode
jasha64 Oct 12, 2024
e26a5c3
Remove WIP files for merging
jasha64 Oct 12, 2024
fda48a0
Comments
jasha64 Oct 13, 2024
b68d8a6
Tidy up constants
jasha64 Oct 14, 2024
c7532f3
Clean up commented out code in PixelsRecordReaderStreamImpl.close()
jasha64 Oct 14, 2024
c19a553
Revert additional logs in BaseAggregationWorker, because aggregation …
jasha64 Oct 14, 2024
777cc9c
Clean up commented out code
jasha64 Oct 14, 2024
145c4d1
Stylize
jasha64 Oct 14, 2024
22d2670
Correct file header
jasha64 Oct 14, 2024
f6d49ab
Update docs
jasha64 Oct 14, 2024
efb0945
Tidy up
jasha64 Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum Scheme
minio, // Minio
redis, // Redis
gcs, // google cloud storage
httpstream,
mock; // mock

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public Input(long transId)
this.transId = transId;
// Issue #468: operatorName is optional, it is to be set by the setter.
this.operatorName = null;
this.stageId = -1;
}

public long getTransId()
Expand All @@ -57,9 +58,14 @@ public void setTransId(long transId)
this.transId = transId;
}

public void setStageId(int stageId) { this.stageId = stageId; }

public int getStageId() { return stageId; }
public int getStageId()
{
return stageId;
}
public void setStageId(int stageId)
{
this.stageId = stageId;
}

/**
* Operator name is optional, it might be null if not set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public enum WorkerType
{
UNKNOWN, // The first enum value is the default value.
SCAN, SCAN_STREAM,
PARTITION,
PARTITION, PARTITION_STREAMING,
BROADCAST_JOIN,
BROADCAST_CHAIN_JOIN,
PARTITIONED_JOIN,
PARTITIONED_JOIN, PARTITIONED_JOIN_STREAMING,
PARTITIONED_CHAIN_JOIN,
AGGREGATION;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,28 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
FullHttpRequest req = (FullHttpRequest) msg;
if (req.method() != HttpMethod.POST)
{
sendResponseAndClose(ctx, req, NOT_FOUND);
sendResponseAndClose(ctx, req, NOT_FOUND, false);
return;
}
if (!Objects.equals(req.headers().get("Content-Type"), "application/x-protobuf"))
{
// silent reject
return;
}
int partitionId = Integer.parseInt(req.headers().get("X-Partition-Id"));
logger.debug("Incoming packet on port: " + httpPort +
", content_length header: " + req.headers().get("content-length") +
", connection header: " + req.headers().get("connection") +
", partition ID header: " + req.headers().get("X-Partition-Id") +
", HTTP request object body total length: " + req.content().readableBytes());

// schema packet: only 1 packet expected, so close the connection immediately
// partitioned mode: close the connection if all partitions received
// else (non-partitioned mode, data packet): close connection if empty packet received
boolean needCloseParentChannel = partitionId == PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER ||
(partitioned && numPartitionsReceived.get() == numPartitions) ||
(Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) &&
req.content().readableBytes() == 0);
ByteBuf byteBuf = req.content();
try
{
Expand All @@ -166,40 +174,42 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
// due to concurrent modifications of the `recordReaders` list
recordReader.lateInitialization(streamHeader);
}
} catch (IOException e)
}
catch (IOException e)
{
logger.error("Invalid stream header values: ", e);
sendResponseAndClose(ctx, req, BAD_REQUEST);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
return;
}
} else if (partitioned)
}
else if (partitioned)
{
// In partitioned mode, every packet brings a streamHeader to prevent errors from possibly
// out-of-order packet arrivals, so we need to parse it, but do not need the return value
// (except for the first incoming packet processed above).
parseStreamHeader(byteBuf);
}
} catch (InvalidProtocolBufferException | IndexOutOfBoundsException e)
}
catch (InvalidProtocolBufferException | IndexOutOfBoundsException e)
{
logger.error("Malformed or corrupted stream header", e);
sendResponseAndClose(ctx, req, BAD_REQUEST);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
return;
}

// We only need to put the byteBuf into the blocking queue to pass it to the recordReader, if the
// client is a data writer (port >= 50100) rather than a schema writer. In the latter case,
// client is a data writer rather than a schema writer. In the latter case,
// the schema packet has been processed when parsing the stream header above.
if (httpPort >= 50100)
if (partitionId != PixelsWriterStreamImpl.PARTITION_ID_SCHEMA_WRITER)
{
byteBuf.retain();
if (!partitioned) byteBufSharedQueue.add(byteBuf);
else
{
int partitionId = Integer.parseInt(req.headers().get("X-Partition-Id"));
if (partitionId < 0 || partitionId >= numPartitions)
{
logger.warn("Client sent invalid partitionId value: " + partitionId);
sendResponseAndClose(ctx, req, BAD_REQUEST);
sendResponseAndClose(ctx, req, BAD_REQUEST, needCloseParentChannel);
return;
}
byteBufBlockingMap.put(partitionId, byteBuf);
Expand All @@ -212,10 +222,11 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
}
}

sendResponseAndClose(ctx, req, HttpResponseStatus.OK);
sendResponseAndClose(ctx, req, HttpResponseStatus.OK, needCloseParentChannel);
}

private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req, HttpResponseStatus status)
private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req, HttpResponseStatus status,
boolean needCloseParentChannel)
{
FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), status);
response.headers()
Expand All @@ -233,18 +244,11 @@ private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req
});
if (Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()))
f.addListener(ChannelFutureListener.CLOSE);
// schema port: only 1 packet expected, so close the connection immediately
// partitioned mode: close connection if all partitions received
// else (non-partitioned mode, data port): close connection if empty packet received
if (httpPort < 50100 || (partitioned && numPartitionsReceived.get() == numPartitions) ||
(Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) &&
req.content().readableBytes() == 0))
if (needCloseParentChannel)
{
f.addListener(future -> {
// shutdown the server
ctx.channel().parent().close().addListener(ChannelFutureListener.CLOSE);
// Can delete the port from the map of open ports, but not necessary
// PixelsWriterStreamImpl.delPort(httpPort);
});
}
}
Expand All @@ -253,7 +257,8 @@ private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req
try
{
this.httpServer.serve(httpPort);
} catch (InterruptedException e)
}
catch (InterruptedException e)
{
logger.error("HTTP server interrupted", e);
}
Expand Down Expand Up @@ -412,7 +417,8 @@ public TypeDescription getFileSchema()
try
{
streamHeaderLatch.await();
} catch (InterruptedException e)
}
catch (InterruptedException e)
{
logger.error("Interrupted while waiting for stream header", e);
}
Expand All @@ -434,7 +440,8 @@ public boolean isPartitioned()
try
{
streamHeaderLatch.await();
} catch (InterruptedException e)
}
catch (InterruptedException e)
{
logger.error("Interrupted while waiting for stream header", e);
}
Expand Down Expand Up @@ -524,19 +531,22 @@ public void close()
{
streamHeaderLatch.await();
while (!byteBufSharedQueue.isEmpty() && !byteBufBlockingMap.isEmpty()) sleep(20);
} catch (InterruptedException e)
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}

try
{
if (!this.httpServerFuture.isDone()) this.httpServerFuture.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e)
}
catch (TimeoutException e)
{
logger.warn("In close(), HTTP server did not shut down in 5 seconds, doing forceful shutdown");
this.httpServerFuture.cancel(true);
} catch (InterruptedException | ExecutionException e)
}
catch (InterruptedException | ExecutionException e)
{
logger.error("Exception during HTTP server shutdown", e);
} finally
Expand All @@ -546,7 +556,8 @@ public void close()
try
{
recordReader.close();
} catch (IOException e)
}
catch (IOException e)
{
logger.error("Exception while closing record reader", e);
}
Expand Down
Loading