Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #736] support streaming broadcast join #760

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
public class Worker<WI extends WorkerInfo>
{
private final long workerId;
private int workerPortIndex;
private final Lease lease;
private final WI workerInfo;
private boolean terminated;

public Worker(long workerId, Lease lease, WI workerInfo)
public Worker(long workerId, Lease lease, int workerPortIndex, WI workerInfo)
{
this.workerId = workerId;
this.workerPortIndex = workerPortIndex;
this.lease = requireNonNull(lease, "lease is null");
this.workerInfo = requireNonNull(workerInfo, "worker info is null");
this.terminated = false;
Expand All @@ -48,6 +50,10 @@ public long getWorkerId()
return workerId;
}

public void setWorkerPortIndex(int index) { this.workerPortIndex = index; }

public int getWorkerPortIndex() { return workerPortIndex; }

public WI getWorkerInfo()
{
return workerInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public enum WorkerType
UNKNOWN, // The first enum value is the default value.
SCAN, SCAN_STREAM,
PARTITION, PARTITION_STREAMING,
BROADCAST_JOIN,
BROADCAST_JOIN, BROADCAST_JOIN_STREAMING,
BROADCAST_CHAIN_JOIN,
PARTITIONED_JOIN, PARTITIONED_JOIN_STREAMING,
PARTITIONED_CHAIN_JOIN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public PixelsReaderStreamImpl(String endpoint, boolean partitioned, int numParti
URI uri = new URI(endpoint);
String IP = uri.getHost();
int httpPort = uri.getPort();
logger.debug("In Pixels stream reader constructor, IP: " + IP + ", port: " + httpPort +
", partitioned: " + partitioned + ", numPartitions: " + numPartitions);
logger.info("In Pixels stream reader constructor, IP: {}, port: {}, partitioned: {}, numPartitions: {}"
, IP, httpPort, partitioned, numPartitions);
if (!Objects.equals(IP, "127.0.0.1") && !Objects.equals(IP, "localhost"))
{
throw new UnsupportedOperationException("Currently, only localhost is supported as the server address");
Expand Down Expand Up @@ -145,7 +145,7 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
return;
}
int partitionId = Integer.parseInt(req.headers().get("X-Partition-Id"));
logger.debug("Incoming packet on port: " + httpPort +
logger.info("Incoming packet on port: " + httpPort +
", content_length header: " + req.headers().get("content-length") +
", connection header: " + req.headers().get("connection") +
", partition ID header: " + req.headers().get("X-Partition-Id") +
Expand All @@ -159,10 +159,12 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg)
(Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) &&
req.content().readableBytes() == 0);
ByteBuf byteBuf = req.content();
logger.info("the content of request {}", byteBuf.toString());
try
{
if (streamHeader == null)
{
logger.info("this is stream header");
try
{
streamHeader = parseStreamHeader(byteBuf);
Expand Down Expand Up @@ -248,6 +250,7 @@ private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req
{
f.addListener(future -> {
// shutdown the server
logger.info("http server close");
ctx.channel().parent().close().addListener(ChannelFutureListener.CLOSE);
});
}
Expand Down Expand Up @@ -301,7 +304,6 @@ private PixelsStreamProto.StreamHeader parseStreamHeader(ByteBuf byteBuf)
ByteBuf metadataBuf = Unpooled.buffer(metadataLength);
byteBuf.getBytes(magicLength + Integer.BYTES, metadataBuf);
PixelsStreamProto.StreamHeader streamHeader = PixelsStreamProto.StreamHeader.parseFrom(metadataBuf.nioBuffer());

// check file version
int fileVersion = streamHeader.getVersion();
if (!PixelsVersion.matchVersion(fileVersion))
Expand All @@ -317,6 +319,7 @@ private PixelsStreamProto.StreamHeader parseStreamHeader(ByteBuf byteBuf)
}
// At this point, the readerIndex of the byteBuf is past the streamHeader and at the start of
// the actual rowGroups.
logger.info("stream header: {} , the readerIndex is {}", streamHeader.toString(), byteBuf.readerIndex());

this.fileSchema = TypeDescription.createSchema(streamHeader.getTypesList());
return streamHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ public boolean isPartitioned()
@Override
public boolean addRowBatch(VectorizedRowBatch rowBatch) throws IOException
{
logger.info("writer add row batch size {}", rowBatch.size);
checkArgument(!partitioned,
"this file is hash partitioned, use addRowBatch(rowBatch, hashValue) instead");
/**
Expand All @@ -487,6 +488,7 @@ public boolean addRowBatch(VectorizedRowBatch rowBatch) throws IOException
// If the current row group size has exceeded the row group size, write current row group.
if (curRowGroupDataLength >= rowGroupSize)
{
logger.info("curRowGroupDataLength {} larger than rowGroupSize {}", curRowGroupDataLength, rowGroupSize);
writeRowGroup();
curRowGroupNumOfRows = 0L;
return false;
Expand Down Expand Up @@ -550,25 +552,11 @@ public void close()
{
try
{
if (curRowGroupNumOfRows != 0)
{
writeRowGroup();
}
// If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially
// send an empty row group here before closing,
// so that the HTTP server can properly move on and close.
else if (isFirstRowGroup)
{
writeRowGroup();
isFirstRowGroup = false;
}

// In non-partitioned mode and for data servers, we send a close request with empty content to the server.
// In partitioned mode, the server closes automatically when it receives all its partitions. No need to send
// a close request.
// Schema servers also close automatically and do not need close requests.
if (!partitioned && partitionId != PARTITION_ID_SCHEMA_WRITER)
logger.info("writer close, curRowGroupNumOfRows {}", curRowGroupNumOfRows);
writeRowGroup();
if (curRowGroupNumOfRows > 0)
{
logger.info("send end stream packet to endpoint {}", this.uri);
if (!partitioned && uri == null)
{
uri = URI.create(fileNameToUri(fileName));
Expand All @@ -587,19 +575,68 @@ else if (isFirstRowGroup)
throw new IOException("Failed to send close request to server. Is the server already closed? " +
"HTTP status code: " + response.getStatusCode());
}
}
for (ColumnWriter cw : columnWriters)
{
cw.close();
}
columnWriterService.shutdown();
columnWriterService.shutdownNow();

for (ColumnWriter cw : columnWriters)
{
cw.close();
if (byteBuf.refCnt() > 0)
{
byteBuf.release();
}
}
columnWriterService.shutdown();
columnWriterService.shutdownNow();
// if (curRowGroupNumOfRows != 0)
// {
// writeRowGroup();
// }
// If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially
// send an empty row group here before closing,
// so that the HTTP server can properly move on and close.
// else if (isFirstRowGroup)
// {
// writeRowGroup();
// isFirstRowGroup = false;
// }

if (byteBuf.refCnt() > 0)
{
byteBuf.release();
}
// In non-partitioned mode and for data servers, we send a close request with empty content to the server.
// In partitioned mode, the server closes automatically when it receives all its partitions. No need to send
// a close request.
// Schema servers also close automatically and do not need close requests.
// if (!partitioned && partitionId != PARTITION_ID_SCHEMA_WRITER)
// {
// if (!partitioned && uri == null)
// {
// uri = URI.create(fileNameToUri(fileName));
// }
// Request req = httpClient
// .preparePost(partitioned ? uris.get(currHashValue).toString() : uri.toString())
// .addHeader(CONTENT_TYPE, "application/x-protobuf")
// .addHeader(CONTENT_LENGTH, 0)
// .addHeader(CONNECTION, CLOSE)
// .build();
//
// outstandingHTTPRequestSemaphore.acquire();
// Response response = httpClient.executeRequest(req).get();
// if (response.getStatusCode() != 200)
// {
// throw new IOException("Failed to send close request to server. Is the server already closed? " +
// "HTTP status code: " + response.getStatusCode());
// }
// }
//
// for (ColumnWriter cw : columnWriters)
// {
// cw.close();
// }
// columnWriterService.shutdown();
// columnWriterService.shutdownNow();
//
// if (byteBuf.refCnt() > 0)
// {
// byteBuf.release();
// }
}
catch (Exception e)
{
Expand Down Expand Up @@ -769,7 +806,7 @@ private void writeRowGroup() throws IOException
uri = URI.create(fileNameToUri(fileName));
}
String reqUri = partitioned ? uris.get(currHashValue).toString() : uri.toString();
logger.debug("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri);
logger.info("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri);
Request req = httpClient.preparePost(reqUri)
.setBody(byteBuf.nioBuffer())
.addHeader("X-Partition-Id", String.valueOf(partitionId))
Expand All @@ -793,6 +830,7 @@ private void writeRowGroup() throws IOException

while (!success)
{
logger.info("try to send row group");
try
{
CompletableFuture<Response> future = new CompletableFuture<>();
Expand All @@ -808,6 +846,7 @@ public Response onCompleted(Response response) throws Exception
throw new IOException("Failed to send row group to server, status code: " + response.getStatusCode());
}
outstandingHTTPRequestSemaphore.release();
logger.info("succeed to get response");
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse)
{
acquireNewRowGroup(reuse);
if (endOfFile) return createEmptyEOFRowBatch(0);
logger.debug("In readBatch(), new row group " + curRGIdx);
logger.info("In readBatch(), new row group " + curRGIdx);
}

if (!checkValid || endOfFile)
Expand Down Expand Up @@ -580,7 +580,7 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse)

private void acquireNewRowGroup(boolean reuse) throws IOException
{
logger.debug("In acquireNewRowGroup(), curRGIdx = " + curRGIdx);
logger.info("In acquireNewRowGroup(), curRGIdx = " + curRGIdx);
if (!endOfFile)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.pixelsdb.pixels.core.vector.VectorizedRowBatch;
import io.pixelsdb.pixels.executor.utils.HashTable;
import io.pixelsdb.pixels.executor.utils.Tuple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.*;
Expand All @@ -50,6 +52,7 @@
*/
public class Joiner
{
private static final Logger log = LogManager.getLogger(Joiner.class);
private final HashTable smallTable = new HashTable();
private final JoinType joinType;
private final TypeDescription smallSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CFWorkerInfo implements WorkerInfo
private final int stageId;
private final String operatorName;
private final List<Integer> hashValues;
private boolean passSchema;

public CFWorkerInfo(String ip, int port, long transId, int stageId,
String operatorName, List<Integer> hashValues)
Expand All @@ -46,6 +47,7 @@ public CFWorkerInfo(String ip, int port, long transId, int stageId,
this.stageId = stageId;
this.operatorName = operatorName;
this.hashValues = hashValues;
this.passSchema = false;
}

public CFWorkerInfo(TurboProto.WorkerInfo workerInfo)
Expand All @@ -56,8 +58,13 @@ public CFWorkerInfo(TurboProto.WorkerInfo workerInfo)
this.stageId = workerInfo.getStageId();
this.operatorName = workerInfo.getOperatorName();
this.hashValues = workerInfo.getHashValuesList();
this.passSchema = false;
}

public void setPassSchema(boolean passSchema) { this.passSchema = passSchema; }

public boolean getPassSchema() { return passSchema; }

public String getIp()
{
return ip;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public void addStageCoordinator(StageCoordinator stageCoordinator, StageDependen
checkArgument(stageCoordinator.getStageId() == stageDependency.getCurrentStageId(),
"the stageDependency does not belong to the stageCoordinator");
this.stageCoordinators.put(stageId, stageCoordinator);
if (stageDependency.getDownStreamStageId() != -1)
{
StageCoordinator parentStageCoordinator = this.stageCoordinators.get(stageDependency.getDownStreamStageId());
stageCoordinator.setDownStreamWorkerNum(parentStageCoordinator.getFixedWorkerNum());
}
this.stageDependencies.put(stageId, stageDependency);
}

Expand Down
Loading