diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java index cd94cfe7f..739b65307 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/task/Worker.java @@ -31,13 +31,15 @@ public class Worker { private final long workerId; + private int workerPortIndex; private final Lease lease; private final WI workerInfo; private boolean terminated; - public Worker(long workerId, Lease lease, WI workerInfo) + public Worker(long workerId, Lease lease, int workerPortIndex, WI workerInfo) { this.workerId = workerId; + this.workerPortIndex = workerPortIndex; this.lease = requireNonNull(lease, "lease is null"); this.workerInfo = requireNonNull(workerInfo, "worker info is null"); this.terminated = false; @@ -48,6 +50,10 @@ public long getWorkerId() return workerId; } + public void setWorkerPortIndex(int index) { this.workerPortIndex = index; } + + public int getWorkerPortIndex() { return workerPortIndex; } + public WI getWorkerInfo() { return workerInfo; diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/turbo/WorkerType.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/turbo/WorkerType.java index 3931c8197..0733e3c03 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/turbo/WorkerType.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/turbo/WorkerType.java @@ -28,7 +28,7 @@ public enum WorkerType UNKNOWN, // The first enum value is the default value. SCAN, SCAN_STREAM, PARTITION, PARTITION_STREAMING, - BROADCAST_JOIN, + BROADCAST_JOIN, BROADCAST_JOIN_STREAMING, BROADCAST_CHAIN_JOIN, PARTITIONED_JOIN, PARTITIONED_JOIN_STREAMING, PARTITIONED_CHAIN_JOIN, diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java index 581da9b4f..9d5e1719a 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsReaderStreamImpl.java @@ -114,8 +114,8 @@ public PixelsReaderStreamImpl(String endpoint, boolean partitioned, int numParti URI uri = new URI(endpoint); String IP = uri.getHost(); int httpPort = uri.getPort(); - logger.debug("In Pixels stream reader constructor, IP: " + IP + ", port: " + httpPort + - ", partitioned: " + partitioned + ", numPartitions: " + numPartitions); + logger.info("In Pixels stream reader constructor, IP: {}, port: {}, partitioned: {}, numPartitions: {}" + , IP, httpPort, partitioned, numPartitions); if (!Objects.equals(IP, "127.0.0.1") && !Objects.equals(IP, "localhost")) { throw new UnsupportedOperationException("Currently, only localhost is supported as the server address"); @@ -145,7 +145,7 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) return; } int partitionId = Integer.parseInt(req.headers().get("X-Partition-Id")); - logger.debug("Incoming packet on port: " + httpPort + + logger.info("Incoming packet on port: " + httpPort + ", content_length header: " + req.headers().get("content-length") + ", connection header: " + req.headers().get("connection") + ", partition ID header: " + req.headers().get("X-Partition-Id") + @@ -159,10 +159,12 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) (Objects.equals(req.headers().get(CONNECTION), CLOSE.toString()) && req.content().readableBytes() == 0); ByteBuf byteBuf = req.content(); + logger.info("the content of request {}", byteBuf.toString()); try { if (streamHeader == null) { + logger.info("this is stream header"); try { streamHeader = parseStreamHeader(byteBuf); @@ -248,6 +250,7 @@ private void sendResponseAndClose(ChannelHandlerContext ctx, FullHttpRequest req { f.addListener(future -> { // shutdown the server + logger.info("http server close"); ctx.channel().parent().close().addListener(ChannelFutureListener.CLOSE); }); } @@ -301,7 +304,6 @@ private PixelsStreamProto.StreamHeader parseStreamHeader(ByteBuf byteBuf) ByteBuf metadataBuf = Unpooled.buffer(metadataLength); byteBuf.getBytes(magicLength + Integer.BYTES, metadataBuf); PixelsStreamProto.StreamHeader streamHeader = PixelsStreamProto.StreamHeader.parseFrom(metadataBuf.nioBuffer()); - // check file version int fileVersion = streamHeader.getVersion(); if (!PixelsVersion.matchVersion(fileVersion)) @@ -317,6 +319,7 @@ private PixelsStreamProto.StreamHeader parseStreamHeader(ByteBuf byteBuf) } // At this point, the readerIndex of the byteBuf is past the streamHeader and at the start of // the actual rowGroups. + logger.info("stream header: {} , the readerIndex is {}", streamHeader.toString(), byteBuf.readerIndex()); this.fileSchema = TypeDescription.createSchema(streamHeader.getTypesList()); return streamHeader; diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java index 127aeb1f0..5f0c9b437 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java @@ -474,6 +474,7 @@ public boolean isPartitioned() @Override public boolean addRowBatch(VectorizedRowBatch rowBatch) throws IOException { + logger.info("writer add row batch size {}", rowBatch.size); checkArgument(!partitioned, "this file is hash partitioned, use addRowBatch(rowBatch, hashValue) instead"); /** @@ -487,6 +488,7 @@ public boolean addRowBatch(VectorizedRowBatch rowBatch) throws IOException // If the current row group size has exceeded the row group size, write current row group. if (curRowGroupDataLength >= rowGroupSize) { + logger.info("curRowGroupDataLength {} larger than rowGroupSize {}", curRowGroupDataLength, rowGroupSize); writeRowGroup(); curRowGroupNumOfRows = 0L; return false; @@ -550,25 +552,11 @@ public void close() { try { - if (curRowGroupNumOfRows != 0) - { - writeRowGroup(); - } - // If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially - // send an empty row group here before closing, - // so that the HTTP server can properly move on and close. - else if (isFirstRowGroup) - { - writeRowGroup(); - isFirstRowGroup = false; - } - - // In non-partitioned mode and for data servers, we send a close request with empty content to the server. - // In partitioned mode, the server closes automatically when it receives all its partitions. No need to send - // a close request. - // Schema servers also close automatically and do not need close requests. - if (!partitioned && partitionId != PARTITION_ID_SCHEMA_WRITER) + logger.info("writer close, curRowGroupNumOfRows {}", curRowGroupNumOfRows); + writeRowGroup(); + if (curRowGroupNumOfRows > 0) { + logger.info("send end stream packet to endpoint {}", this.uri); if (!partitioned && uri == null) { uri = URI.create(fileNameToUri(fileName)); @@ -587,19 +575,68 @@ else if (isFirstRowGroup) throw new IOException("Failed to send close request to server. Is the server already closed? " + "HTTP status code: " + response.getStatusCode()); } - } + for (ColumnWriter cw : columnWriters) + { + cw.close(); + } + columnWriterService.shutdown(); + columnWriterService.shutdownNow(); - for (ColumnWriter cw : columnWriters) - { - cw.close(); + if (byteBuf.refCnt() > 0) + { + byteBuf.release(); + } } - columnWriterService.shutdown(); - columnWriterService.shutdownNow(); +// if (curRowGroupNumOfRows != 0) +// { +// writeRowGroup(); +// } + // If the outgoing stream is empty (addRowBatch() and thus writeRowGroup() never called), we artificially + // send an empty row group here before closing, + // so that the HTTP server can properly move on and close. +// else if (isFirstRowGroup) +// { +// writeRowGroup(); +// isFirstRowGroup = false; +// } - if (byteBuf.refCnt() > 0) - { - byteBuf.release(); - } + // In non-partitioned mode and for data servers, we send a close request with empty content to the server. + // In partitioned mode, the server closes automatically when it receives all its partitions. No need to send + // a close request. + // Schema servers also close automatically and do not need close requests. +// if (!partitioned && partitionId != PARTITION_ID_SCHEMA_WRITER) +// { +// if (!partitioned && uri == null) +// { +// uri = URI.create(fileNameToUri(fileName)); +// } +// Request req = httpClient +// .preparePost(partitioned ? uris.get(currHashValue).toString() : uri.toString()) +// .addHeader(CONTENT_TYPE, "application/x-protobuf") +// .addHeader(CONTENT_LENGTH, 0) +// .addHeader(CONNECTION, CLOSE) +// .build(); +// +// outstandingHTTPRequestSemaphore.acquire(); +// Response response = httpClient.executeRequest(req).get(); +// if (response.getStatusCode() != 200) +// { +// throw new IOException("Failed to send close request to server. Is the server already closed? " + +// "HTTP status code: " + response.getStatusCode()); +// } +// } +// +// for (ColumnWriter cw : columnWriters) +// { +// cw.close(); +// } +// columnWriterService.shutdown(); +// columnWriterService.shutdownNow(); +// +// if (byteBuf.refCnt() > 0) +// { +// byteBuf.release(); +// } } catch (Exception e) { @@ -769,7 +806,7 @@ private void writeRowGroup() throws IOException uri = URI.create(fileNameToUri(fileName)); } String reqUri = partitioned ? uris.get(currHashValue).toString() : uri.toString(); - logger.debug("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri); + logger.info("Sending row group with length: " + byteBuf.writerIndex() + " to endpoint: " + reqUri); Request req = httpClient.preparePost(reqUri) .setBody(byteBuf.nioBuffer()) .addHeader("X-Partition-Id", String.valueOf(partitionId)) @@ -793,6 +830,7 @@ private void writeRowGroup() throws IOException while (!success) { + logger.info("try to send row group"); try { CompletableFuture future = new CompletableFuture<>(); @@ -808,6 +846,7 @@ public Response onCompleted(Response response) throws Exception throw new IOException("Failed to send row group to server, status code: " + response.getStatusCode()); } outstandingHTTPRequestSemaphore.release(); + logger.info("succeed to get response"); return response; } diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java index 0588cbd47..ece5415c8 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderStreamImpl.java @@ -449,7 +449,7 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse) { acquireNewRowGroup(reuse); if (endOfFile) return createEmptyEOFRowBatch(0); - logger.debug("In readBatch(), new row group " + curRGIdx); + logger.info("In readBatch(), new row group " + curRGIdx); } if (!checkValid || endOfFile) @@ -580,7 +580,7 @@ public VectorizedRowBatch readBatch(int batchSize, boolean reuse) private void acquireNewRowGroup(boolean reuse) throws IOException { - logger.debug("In acquireNewRowGroup(), curRGIdx = " + curRGIdx); + logger.info("In acquireNewRowGroup(), curRGIdx = " + curRGIdx); if (!endOfFile) { try diff --git a/pixels-executor/src/main/java/io/pixelsdb/pixels/executor/join/Joiner.java b/pixels-executor/src/main/java/io/pixelsdb/pixels/executor/join/Joiner.java index e7fab28ba..c3f10f926 100644 --- a/pixels-executor/src/main/java/io/pixelsdb/pixels/executor/join/Joiner.java +++ b/pixels-executor/src/main/java/io/pixelsdb/pixels/executor/join/Joiner.java @@ -24,6 +24,8 @@ import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; import io.pixelsdb.pixels.executor.utils.HashTable; import io.pixelsdb.pixels.executor.utils.Tuple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.*; @@ -50,6 +52,7 @@ */ public class Joiner { + private static final Logger log = LogManager.getLogger(Joiner.class); private final HashTable smallTable = new HashTable(); private final JoinType joinType; private final TypeDescription smallSchema; diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java index cae6d0049..8cd6cd201 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/CFWorkerInfo.java @@ -36,6 +36,7 @@ public class CFWorkerInfo implements WorkerInfo private final int stageId; private final String operatorName; private final List hashValues; + private boolean passSchema; public CFWorkerInfo(String ip, int port, long transId, int stageId, String operatorName, List hashValues) @@ -46,6 +47,7 @@ public CFWorkerInfo(String ip, int port, long transId, int stageId, this.stageId = stageId; this.operatorName = operatorName; this.hashValues = hashValues; + this.passSchema = false; } public CFWorkerInfo(TurboProto.WorkerInfo workerInfo) @@ -56,8 +58,13 @@ public CFWorkerInfo(TurboProto.WorkerInfo workerInfo) this.stageId = workerInfo.getStageId(); this.operatorName = workerInfo.getOperatorName(); this.hashValues = workerInfo.getHashValuesList(); + this.passSchema = false; } + public void setPassSchema(boolean passSchema) { this.passSchema = passSchema; } + + public boolean getPassSchema() { return passSchema; } + public String getIp() { return ip; diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java index 20e93ef05..2ed8a52b2 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/PlanCoordinator.java @@ -66,6 +66,11 @@ public void addStageCoordinator(StageCoordinator stageCoordinator, StageDependen checkArgument(stageCoordinator.getStageId() == stageDependency.getCurrentStageId(), "the stageDependency does not belong to the stageCoordinator"); this.stageCoordinators.put(stageId, stageCoordinator); + if (stageDependency.getDownStreamStageId() != -1) + { + StageCoordinator parentStageCoordinator = this.stageCoordinators.get(stageDependency.getDownStreamStageId()); + stageCoordinator.setDownStreamWorkerNum(parentStageCoordinator.getFixedWorkerNum()); + } this.stageDependencies.put(stageId, stageDependency); } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java index a49ff74d1..17bbb4b37 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/StageCoordinator.java @@ -53,13 +53,14 @@ public class StageCoordinator private final int stageId; private final boolean isQueued; + private int downStreamWorkerNum; private final int fixedWorkerNum; private final TaskQueue taskQueue; private final Map> workerIdToWorkers = new ConcurrentHashMap<>(); // this.workers is used for dependency checking, no concurrent reads and writes private final List> workers = new ArrayList<>(); - private final Map workerIdToWorkerIndex = new ConcurrentHashMap<>(); - private final AtomicInteger workerIndexAssigner = new AtomicInteger(0); + private final Map> workerIdToWorkerIndex = new ConcurrentHashMap<>(); + private int workerIndexAssigner; private final Object lock = new Object(); /** @@ -78,6 +79,8 @@ public StageCoordinator(int stageId, int workerNum) this.isQueued = false; this.fixedWorkerNum = workerNum; this.taskQueue = null; + this.downStreamWorkerNum = 0; + this.workerIndexAssigner = 0; } /** @@ -94,6 +97,8 @@ public StageCoordinator(int stageId, List pendingTasks) this.isQueued = true; this.fixedWorkerNum = 0; this.taskQueue = new TaskQueue<>(pendingTasks); + this.downStreamWorkerNum = 0; + this.workerIndexAssigner = 0; } /** @@ -105,7 +110,44 @@ public void addWorker(Worker worker) synchronized (this.lock) { this.workerIdToWorkers.put(worker.getWorkerId(), worker); - this.workerIdToWorkerIndex.put(worker.getWorkerId(), this.workerIndexAssigner.getAndIncrement()); + if (fixedWorkerNum > 0 && downStreamWorkerNum > 0) + { + if (downStreamWorkerNum > fixedWorkerNum) + { + // one-to-multiple stream + // TODO: find a query to test + List workerIndexs = new ArrayList<>(); + int num = downStreamWorkerNum / fixedWorkerNum; + if (downStreamWorkerNum > fixedWorkerNum*num) + { + num++; + } + for (int i = 0; i < num; i++) + { + workerIndexs.add(this.workerIndexAssigner % this.downStreamWorkerNum); + this.workerIndexAssigner++; + } + } else + { + // multiple-to-one stream +// if (workerIndexAssigner < downStreamWorkerNum) +// { +// worker.getWorkerInfo().setPassSchema(true); +// } + worker.setWorkerPortIndex(this.workerIndexAssigner / this.downStreamWorkerNum); + List workerIndexes = new ArrayList<>(); + workerIndexes.add(this.workerIndexAssigner % this.downStreamWorkerNum); + this.workerIndexAssigner++; + this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexes); + } + } else + { + // assume one-to-one stream + worker.setWorkerPortIndex(0); + List workerIndexs = new ArrayList<>(this.workerIndexAssigner); + this.workerIndexAssigner++; + this.workerIdToWorkerIndex.put(worker.getWorkerId(), workerIndexs); + } this.workers.add(worker); if (!this.isQueued && this.workers.size() == this.fixedWorkerNum) { @@ -191,14 +233,14 @@ public Worker getWorker(long workerId) * @param workerId the (global) id of the worker * @return the index of the worker in this stage, or < 0 if the worker is not found */ - public int getWorkerIndex(long workerId) + public List getWorkerIndex(long workerId) { - Integer index = this.workerIdToWorkerIndex.get(workerId); + List index = this.workerIdToWorkerIndex.get(workerId); if (index != null) { return index; } - return -1; + return null; } /** @@ -246,4 +288,20 @@ public List> getWorkers() { return this.workers; } + + /** + * set down stream workers num + */ + public void setDownStreamWorkerNum(int downStreamWorkerNum) + { + this.downStreamWorkerNum = downStreamWorkerNum; + } + + /** + * get worker num of this stage + */ + public int getFixedWorkerNum() + { + return this.fixedWorkerNum; + } } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java index 40f730658..da8f64920 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateService.java @@ -75,7 +75,7 @@ public Worker registerWorker(CFWorkerInfo workerInfo) throws Worke throw new WorkerCoordinateException("failed to register worker, error code=" + response.getErrorCode()); } return new Worker<>(response.getWorkerId(), - new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), workerInfo); + new Lease(response.getLeasePeriodMs(), response.getLeaseStartTimeMs()), response.getWorkerPortIndex(), workerInfo); } /** diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java index a13cd4b31..a9794ff2b 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/coordinate/WorkerCoordinateServiceImpl.java @@ -57,7 +57,7 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request, CFWorkerInfo workerInfo = new CFWorkerInfo(request.getWorkerInfo()); Lease lease = new Lease(WorkerLeasePeriodMs, System.currentTimeMillis()); long workerId = CFWorkerManager.Instance().createWorkerId(); - Worker worker = new Worker<>(workerId, lease, workerInfo); + Worker worker = new Worker<>(workerId, lease, 0, workerInfo); CFWorkerManager.Instance().registerCFWorker(worker); log.debug("register worker, local address: " + workerInfo.getIp() + ", transId: " + workerInfo.getTransId() + ", stageId: " + workerInfo.getStageId() + ", workerId: " + workerId); @@ -67,7 +67,7 @@ public void registerWorker(TurboProto.RegisterWorkerRequest request, requireNonNull(stageCoordinator, "stage coordinator is not found"); stageCoordinator.addWorker(worker); TurboProto.RegisterWorkerResponse response = TurboProto.RegisterWorkerResponse.newBuilder() - .setErrorCode(SUCCESS).setWorkerId(workerId).setLeasePeriodMs(lease.getPeriodMs()) + .setErrorCode(SUCCESS).setWorkerId(workerId).setWorkerPortIndex(worker.getWorkerPortIndex()).setLeasePeriodMs(lease.getPeriodMs()) .setLeaseStartTimeMs(lease.getStartTimeMs()).build(); responseObserver.onNext(response); responseObserver.onCompleted(); @@ -99,22 +99,21 @@ public void getDownstreamWorkers(TurboProto.GetDownstreamWorkersRequest request, } else { - int workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId); - if (workerIndex < 0) + List workerIndex = planCoordinator.getStageCoordinator(workerInfo.getStageId()).getWorkerIndex(workerId); + if (workerIndex == null) { builder.setErrorCode(ErrorCode.WORKER_COORDINATE_WORKER_NOT_FOUND); } else { - if (workerIndex >= downStreamWorkers.size()) - { - builder.setErrorCode(ErrorCode.WORKER_COORDINATE_NO_DOWNSTREAM); - } - else + for (Integer index : workerIndex) { // get the worker with the same index in the downstream stage as the downstream worker builder.setErrorCode(SUCCESS); - builder.addDownstreamWorkers(downStreamWorkers.get(workerIndex).getWorkerInfo().toProto()); + if (index < downStreamWorkers.size()) + { + builder.addDownstreamWorkers(downStreamWorkers.get(index).getWorkerInfo().toProto()); + } } } } diff --git a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/SingleStageJoinStreamOperator.java b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/SingleStageJoinStreamOperator.java index 89287fa8e..98ff129af 100644 --- a/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/SingleStageJoinStreamOperator.java +++ b/pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/SingleStageJoinStreamOperator.java @@ -19,12 +19,19 @@ */ package io.pixelsdb.pixels.planner.plan.physical; +import io.pixelsdb.pixels.common.turbo.InvokerFactory; import io.pixelsdb.pixels.common.turbo.Output; +import io.pixelsdb.pixels.common.turbo.WorkerType; import io.pixelsdb.pixels.executor.join.JoinAlgorithm; import io.pixelsdb.pixels.planner.plan.physical.input.JoinInput; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static io.pixelsdb.pixels.planner.plan.physical.OperatorExecutor.waitForCompletion; /** * @author hank @@ -32,6 +39,8 @@ */ public class SingleStageJoinStreamOperator extends SingleStageJoinOperator { + private static final Logger logger = LogManager.getLogger(SingleStageJoinStreamOperator.class); + public SingleStageJoinStreamOperator(String name, boolean complete, JoinInput joinInput, JoinAlgorithm joinAlgo) { @@ -47,14 +56,74 @@ public SingleStageJoinStreamOperator(String name, boolean complete, @Override public CompletableFuture[]> execute() { - // TODO: implement - return null; + return executePrev().handle((result, exception) -> + { + if (exception != null) + { + throw new CompletionException("failed to complete the previous stages", exception); + } + joinOutputs = new CompletableFuture[joinInputs.size()]; + for (int i = 0; i < joinInputs.size(); ++i) + { + JoinInput joinInput = joinInputs.get(i); + if (joinAlgo == JoinAlgorithm.BROADCAST) + { + joinOutputs[i] = InvokerFactory.Instance() + .getInvoker(WorkerType.BROADCAST_JOIN_STREAMING).invoke(joinInput); + } + else if (joinAlgo == JoinAlgorithm.BROADCAST_CHAIN) + { +// joinOutputs[i] = InvokerFactory.Instance() +// .getInvoker(WorkerType.BROADCAST_CHAIN_JOIN).invoke(joinInput); + throw new UnsupportedOperationException("join algorithm '" + joinAlgo + "' is unsupported"); + } + else + { + throw new UnsupportedOperationException("join algorithm '" + joinAlgo + "' is unsupported"); + } + } + + logger.debug("invoke " + this.getName()); + return joinOutputs; + }); } @Override public CompletableFuture executePrev() { - // TODO: implement - return null; + CompletableFuture prevStagesFuture = new CompletableFuture<>(); + operatorService.execute(() -> + { + try + { + CompletableFuture[]> smallChildFuture = null; + if (smallChild != null) + { + throw new InterruptedException(); + } + CompletableFuture[]> largeChildFuture = null; + if (largeChild != null) + { + largeChildFuture = largeChild.execute(); + } + if (smallChildFuture != null) + { + CompletableFuture[] smallChildOutputs = smallChildFuture.join(); + waitForCompletion(smallChildOutputs); + } + if (largeChildFuture != null) + { + CompletableFuture[] largeChildOutputs = largeChildFuture.join(); +// waitForCompletion(largeChildOutputs, LargeSideCompletionRatio); + } + prevStagesFuture.complete(null); + } + catch (InterruptedException e) + { + throw new CompletionException("interrupted when waiting for the completion of previous stages", e); + } + }); + + return prevStagesFuture; } } diff --git a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvoker.java b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvoker.java new file mode 100644 index 000000000..90fb29a5e --- /dev/null +++ b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvoker.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.invoker.vhive; + +import com.alibaba.fastjson.JSON; +import com.google.common.util.concurrent.ListenableFuture; +import io.pixelsdb.pixels.common.turbo.Input; +import io.pixelsdb.pixels.common.turbo.Output; +import io.pixelsdb.pixels.planner.plan.physical.input.BroadcastJoinInput; +import io.pixelsdb.pixels.planner.plan.physical.output.JoinOutput; +import io.pixelsdb.pixels.turbo.TurboProto; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; + +public class BroadcastJoinStreamInvoker extends VhiveInvoker +{ + private final Logger log = LogManager.getLogger(BroadcastJoinInvoker.class); + + protected BroadcastJoinStreamInvoker(String functionName) + { + super(functionName); + } + + @Override + public Output parseOutput(String outputJson) + { + return JSON.parseObject(outputJson, JoinOutput.class); + } + + @Override + public CompletableFuture invoke(Input input) + { +// log.info(String.format("invoke BroadcastJoinStreamInput: %s", JSON.toJSONString(input, SerializerFeature.PrettyFormat, SerializerFeature.DisableCircularReferenceDetect))); + ListenableFuture future = Vhive.Instance().getAsyncClient().broadcastJoinStream((BroadcastJoinInput) input); + return genCompletableFuture(future); + } +} diff --git a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvokerProvider.java b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvokerProvider.java new file mode 100644 index 000000000..d0c9bea5c --- /dev/null +++ b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/BroadcastJoinStreamInvokerProvider.java @@ -0,0 +1,31 @@ +package io.pixelsdb.pixels.invoker.vhive; + +import io.pixelsdb.pixels.common.turbo.FunctionService; +import io.pixelsdb.pixels.common.turbo.Invoker; +import io.pixelsdb.pixels.common.turbo.InvokerProvider; +import io.pixelsdb.pixels.common.turbo.WorkerType; +import io.pixelsdb.pixels.common.utils.ConfigFactory; + +public class BroadcastJoinStreamInvokerProvider implements InvokerProvider +{ + private static final ConfigFactory config = ConfigFactory.Instance(); + + @Override + public Invoker createInvoker() + { + String broadcastJoinWorker = config.getProperty("broadcast.join.worker.name"); + return new BroadcastJoinStreamInvoker(broadcastJoinWorker); + } + + @Override + public WorkerType workerType() + { + return WorkerType.BROADCAST_JOIN_STREAMING; + } + + @Override + public boolean compatibleWith(FunctionService functionService) + { + return functionService.equals(FunctionService.vhive); + } +} diff --git a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/WorkerAsyncClient.java b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/WorkerAsyncClient.java index a7d11a811..7740c5480 100644 --- a/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/WorkerAsyncClient.java +++ b/pixels-turbo/pixels-invoker-vhive/src/main/java/io/pixelsdb/pixels/invoker/vhive/WorkerAsyncClient.java @@ -93,6 +93,15 @@ public ListenableFuture broadcastJoin(BroadcastJoinIn return this.stub.process(request); } + public ListenableFuture broadcastJoinStream(BroadcastJoinInput input) + { + TurboProto.WorkerRequest request = TurboProto.WorkerRequest.newBuilder() + .setWorkerType(String.valueOf(WorkerType.BROADCAST_JOIN_STREAMING)) + .setJson(JSON.toJSONString(input, SerializerFeature.DisableCircularReferenceDetect)) + .build(); + return this.stub.process(request); + } + public ListenableFuture partitionChainJoin(PartitionedChainJoinInput input) { TurboProto.WorkerRequest request = TurboProto.WorkerRequest.newBuilder() diff --git a/pixels-turbo/pixels-invoker-vhive/src/main/resources/META-INF/services/io.pixelsdb.pixels.common.turbo.InvokerProvider b/pixels-turbo/pixels-invoker-vhive/src/main/resources/META-INF/services/io.pixelsdb.pixels.common.turbo.InvokerProvider index cbc049c55..9da89df78 100644 --- a/pixels-turbo/pixels-invoker-vhive/src/main/resources/META-INF/services/io.pixelsdb.pixels.common.turbo.InvokerProvider +++ b/pixels-turbo/pixels-invoker-vhive/src/main/resources/META-INF/services/io.pixelsdb.pixels.common.turbo.InvokerProvider @@ -5,6 +5,7 @@ io.pixelsdb.pixels.invoker.vhive.PartitionStreamInvokerProvider io.pixelsdb.pixels.invoker.vhive.PartitionJoinInvokerProvider io.pixelsdb.pixels.invoker.vhive.PartitionedJoinStreamInvokerProvider io.pixelsdb.pixels.invoker.vhive.PartitionChainJoinInvokerProvider +io.pixelsdb.pixels.invoker.vhive.BroadcastJoinStreamInvokerProvider io.pixelsdb.pixels.invoker.vhive.BroadcastJoinInvokerProvider io.pixelsdb.pixels.invoker.vhive.BroadcastChainJoinInvokerProvider io.pixelsdb.pixels.invoker.vhive.AggregationInvokerProvider diff --git a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BaseBroadcastJoinWorker.java b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BaseBroadcastJoinWorker.java index 947e26e75..b4833435c 100644 --- a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BaseBroadcastJoinWorker.java +++ b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/BaseBroadcastJoinWorker.java @@ -142,6 +142,7 @@ public JoinOutput process(BroadcastJoinInput event) Joiner joiner = new Joiner(joinType, WorkerCommon.getResultSchema(leftSchema.get(), leftCols), leftColAlias, leftProjection, leftKeyColumnIds, WorkerCommon.getResultSchema(rightSchema.get(), rightCols), rightColAlias, rightProjection, rightKeyColumnIds); + logger.info("joiner joined schema is {}", joiner.getJoinedSchema().getChildren()); // build the hash table for the left table. List leftFutures = new ArrayList<>(); for (InputSplit inputSplit : leftInputs) @@ -165,6 +166,7 @@ public JoinOutput process(BroadcastJoinInput event) } logger.info("hash table size: " + joiner.getSmallTableSize() + ", duration (ns): " + (workerMetrics.getInputCostNs() + workerMetrics.getComputeCostNs())); + logger.info("joiner joined schema is {}", joiner.getJoinedSchema().getChildren()); List> result = new ArrayList<>(); if (partitionOutput) @@ -215,6 +217,7 @@ public JoinOutput process(BroadcastJoinInput event) throw new WorkerException("error occurred threads, please check the stacktrace before this log record"); } } + logger.info("joiner joined schema is {}", joiner.getJoinedSchema().getChildren()); String outputPath = outputFolder + outputInfo.getFileNames().get(0); try @@ -242,6 +245,7 @@ public JoinOutput process(BroadcastJoinInput event) } else { + logger.info("joiner joined schema is {}", joiner.getJoinedSchema().getChildren()); pixelsWriter = WorkerCommon.getWriter(joiner.getJoinedSchema(), WorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, encoding, false, null); diff --git a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java index dd6ce3c10..de844fa74 100644 --- a/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java +++ b/pixels-turbo/pixels-worker-common/src/main/java/io/pixelsdb/pixels/worker/common/StreamWorkerCommon.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.net.URI; +import java.util.HashMap; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutorService; @@ -45,6 +46,8 @@ public class StreamWorkerCommon extends WorkerCommon { private static final Logger logger = LogManager.getLogger(StreamWorkerCommon.class); private static final Storage http = null; // placeholder. todo: modularize into a pixels-storage-stream module. + private static final HashMap readerMap = new HashMap<>(); + private static final HashMap writerMap = new HashMap<>(); public static void initStorage(StorageInfo storageInfo, Boolean isOutput) throws IOException { @@ -82,6 +85,7 @@ public static void passSchemaToNextLevel(TypeDescription schema, StorageInfo sto { throw new IllegalArgumentException("Attempt to call a streaming mode function with a non-HTTP storage"); } + logger.info("pass schema to endpoint {}", endpoint); PixelsWriter pixelsWriter = getWriter(schema, null, endpoint, false, false, PARTITION_ID_SCHEMA_WRITER, null, null, true); pixelsWriter.close(); // We utilize the sendRowGroup() in PixelsWriterStreamImpl's close() to send the schema. @@ -133,6 +137,64 @@ public static TypeDescription getSchemaFromPaths(Storage storage, List p return WorkerCommon.getFileSchemaFromPaths(storage, paths); } + public static void getSchemaFromTwoPaths(ExecutorService executor, + Storage leftStorage, Storage rightStorage, + AtomicReference leftSchema, + AtomicReference rightSchema, + List leftInputSplits, List rightEndpoints) + { + requireNonNull(executor, "executor is null"); + requireNonNull(leftSchema, "leftSchema is null"); + requireNonNull(rightSchema, "rightSchema is null"); + requireNonNull(leftInputSplits, "leftPaths is null"); + requireNonNull(rightEndpoints, "rightPaths is null"); + + if (leftStorage != http && rightStorage == http) + { + Future leftFuture = executor.submit(() -> { + try + { + leftSchema.set(getFileSchemaFromSplits(leftStorage, leftInputSplits)); + } catch (IOException | InterruptedException e) + { + logger.error("failed to read the file schema for the left table", e); + } + }); + Future rightFuture = executor.submit(() -> { + try + { + for (String endpoint : rightEndpoints) + { + PixelsReader pixelsReader; + if (!readerMap.containsKey(endpoint)) + { + pixelsReader = new PixelsReaderStreamImpl(endpoint); + readerMap.put(endpoint, pixelsReader); + } else + { + pixelsReader = readerMap.get(endpoint); + } + rightSchema.set(pixelsReader.getFileSchema()); + logger.info("succeed to get right schema for {}", endpoint); + } + // XXX: This `close()` makes the test noticeably slower. Will need to look into it. + } catch (Exception e) + { + e.printStackTrace(); + } + }); + try + { + leftFuture.get(); + rightFuture.get(); + } catch (Throwable e) + { + logger.error("interrupted while waiting for the termination of schema read", e); + } + } else + throw new UnsupportedOperationException("schema is not compatible"); + } + public static void getSchemaFromPaths(ExecutorService executor, Storage leftStorage, Storage rightStorage, AtomicReference leftSchema, @@ -201,8 +263,18 @@ public static PixelsReader getReader(Storage.Scheme storageScheme, String endpoi requireNonNull(endpoint, "fileName is null"); if (storageScheme == Storage.Scheme.httpstream) { - logger.debug("getReader streaming mode: " + endpoint); - return new PixelsReaderStreamImpl(endpoint, partitioned, numPartitions); + logger.info("getReader streaming mode: " + endpoint); + PixelsReader pixelsReader; +// if (readerMap.containsKey(endpoint)) +// { +// pixelsReader = readerMap.get(endpoint); +// } else +// { +// pixelsReader = new PixelsReaderStreamImpl(endpoint, partitioned, numPartitions); +// } + pixelsReader = new PixelsReaderStreamImpl(endpoint, partitioned, numPartitions); + + return pixelsReader; } else return WorkerCommon.getReader(endpoint, WorkerCommon.getStorage(storageScheme)); } @@ -237,23 +309,34 @@ public static PixelsWriter getWriter(TypeDescription schema, Storage storage, checkArgument(!isPartitioned || outputEndpoints != null, "outputPaths is null whereas isPartitioned is true"); - PixelsWriterStreamImpl.Builder builder = PixelsWriterStreamImpl.newBuilder(); - builder.setSchema(schema) - .setPixelStride(pixelStride) - .setRowGroupSize(rowGroupSize) - .setEncodingLevel(EncodingLevel.EL2) // it is worth to do encoding - .setPartitioned(isPartitioned) - .setPartitionId(isSchemaWriter ? PARTITION_ID_SCHEMA_WRITER : (isPartitioned ? partitionId : -1)); - if (!isPartitioned) - { - builder.setUri(URI.create(outputPath)); - } - else - { - builder.setFileNames(outputEndpoints) - .setPartKeyColumnIds(keyColumnIds); - } - return builder.build(); + PixelsWriter pixelsWriter; +// if (writerMap.containsKey(outputPath)) +// { +// pixelsWriter = writerMap.get(outputPath); +// logger.info("get pixels stream writer from old value"); +// } else +// { + PixelsWriterStreamImpl.Builder builder = PixelsWriterStreamImpl.newBuilder(); + builder.setSchema(schema) + .setPixelStride(pixelStride) + .setRowGroupSize(rowGroupSize) + .setEncodingLevel(EncodingLevel.EL2) // it is worth to do encoding + .setPartitioned(isPartitioned) + .setPartitionId(isSchemaWriter ? PARTITION_ID_SCHEMA_WRITER : (isPartitioned ? partitionId : -1)); + if (!isPartitioned) + { + builder.setUri(URI.create(outputPath)); + } + else + { + builder.setFileNames(outputEndpoints) + .setPartKeyColumnIds(keyColumnIds); + } + writerMap.put(outputPath, builder.build()); + pixelsWriter = writerMap.get(outputPath); + logger.info("get pixels stream writer from new value"); +// } + return pixelsWriter; } public static PixelsReaderOption getReaderOption(long transId, String[] cols, PixelsReader pixelsReader, @@ -270,4 +353,15 @@ public static PixelsReaderOption getReaderOption(long transId, String[] cols, Pi return option; } + + public static PixelsReaderOption getReaderOption(long transId, String[] cols) + { + PixelsReaderOption option = new PixelsReaderOption(); + option.skipCorruptRecords(true); + option.tolerantSchemaEvolution(true); + option.transId(transId); + option.includeCols(cols); + + return option; + } } diff --git a/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/BroadcastJoinStreamWorker.java b/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/BroadcastJoinStreamWorker.java new file mode 100644 index 000000000..17e70965e --- /dev/null +++ b/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/BroadcastJoinStreamWorker.java @@ -0,0 +1,490 @@ +/* + * Copyright 2024 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.worker.vhive; + +import com.alibaba.fastjson.JSON; +import com.google.common.collect.ImmutableList; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.turbo.WorkerType; +import io.pixelsdb.pixels.core.PixelsReader; +import io.pixelsdb.pixels.core.PixelsWriter; +import io.pixelsdb.pixels.core.TypeDescription; +import io.pixelsdb.pixels.core.reader.PixelsReaderOption; +import io.pixelsdb.pixels.core.reader.PixelsRecordReader; +import io.pixelsdb.pixels.core.utils.Bitmap; +import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; +import io.pixelsdb.pixels.executor.join.JoinType; +import io.pixelsdb.pixels.executor.join.Joiner; +import io.pixelsdb.pixels.executor.join.Partitioner; +import io.pixelsdb.pixels.executor.predicate.TableScanFilter; +import io.pixelsdb.pixels.planner.coordinate.CFWorkerInfo; +import io.pixelsdb.pixels.planner.coordinate.WorkerCoordinateService; +import io.pixelsdb.pixels.planner.plan.logical.Join; +import io.pixelsdb.pixels.planner.plan.physical.domain.*; +import io.pixelsdb.pixels.planner.plan.physical.input.BroadcastJoinInput; +import io.pixelsdb.pixels.planner.plan.physical.input.JoinInput; +import io.pixelsdb.pixels.planner.plan.physical.output.JoinOutput; +import io.pixelsdb.pixels.worker.common.*; +import io.pixelsdb.pixels.worker.vhive.utils.RequestHandler; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.sql.Time; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +public class BroadcastJoinStreamWorker extends BaseBroadcastJoinWorker implements RequestHandler { + private final Logger logger; + protected WorkerCoordinateService workerCoordinatorService; + private final WorkerMetrics workerMetrics; + private io.pixelsdb.pixels.common.task.Worker worker; + + public BroadcastJoinStreamWorker(WorkerContext context) { + super(context); + this.logger = context.getLogger(); + this.workerMetrics = context.getWorkerMetrics(); + this.workerMetrics.clear(); + } + + @Override + public JoinOutput handleRequest(BroadcastJoinInput input) { + long startTime = System.currentTimeMillis(); + try { + int stageId = input.getStageId(); + long transId = input.getTransId(); + String ip = WorkerCommon.getIpAddress(); + int port = WorkerCommon.getPort(); + String coordinatorIp = WorkerCommon.getCoordinatorIp(); + int coordinatorPort = WorkerCommon.getCoordinatorPort(); + CFWorkerInfo workerInfo = new CFWorkerInfo(ip, port, transId, stageId, "broadcast_join", Collections.emptyList()); + workerCoordinatorService = new WorkerCoordinateService(coordinatorIp, coordinatorPort); + worker = workerCoordinatorService.registerWorker(workerInfo); + JoinOutput output = process(input); + workerCoordinatorService.terminateWorker(worker.getWorkerId()); + return output; + } catch (Throwable e) { + JoinOutput joinOutput = new JoinOutput(); + this.logger.error("error during registering worker", e); + joinOutput.setSuccessful(false); + joinOutput.setErrorMessage(e.getMessage()); + joinOutput.setDurationMs((int) (System.currentTimeMillis() - startTime)); + return joinOutput; + } + } + + @Override + public String getRequestId() { + return this.context.getRequestId(); + } + + @Override + public WorkerType getWorkerType() { + return WorkerType.BROADCAST_JOIN_STREAMING; + } + + @Override + public JoinOutput process(BroadcastJoinInput input) { + JoinOutput joinOutput = new JoinOutput(); + long startTime = System.currentTimeMillis(); + joinOutput.setStartTimeMs(startTime); + joinOutput.setRequestId(context.getRequestId()); + joinOutput.setSuccessful(true); + joinOutput.setErrorMessage(""); + + try { + int cores = Runtime.getRuntime().availableProcessors(); + logger.info("Number of cores available: " + cores); + WorkerThreadExceptionHandler exceptionHandler = new WorkerThreadExceptionHandler(logger); + ExecutorService threadPool = Executors.newFixedThreadPool(cores * 2, + new WorkerThreadFactory(exceptionHandler)); + + long transId = input.getTransId(); + BroadcastTableInfo leftTable = requireNonNull(input.getSmallTable(), "leftTable is null"); + StorageInfo leftInputStorageInfo = requireNonNull(leftTable.getStorageInfo(), "leftStorageInfo is null"); + List leftInputs = requireNonNull(leftTable.getInputSplits(), "leftInputs is null"); + checkArgument(leftInputs.size() > 0, "left table is empty"); + String[] leftColumnsToRead = leftTable.getColumnsToRead(); + int[] leftKeyColumnIds = leftTable.getKeyColumnIds(); + TableScanFilter leftFilter = JSON.parseObject(leftTable.getFilter(), TableScanFilter.class); + + BroadcastTableInfo rightTable = requireNonNull(input.getLargeTable(), "rightTable is null"); + StorageInfo rightInputStorageInfo = requireNonNull(rightTable.getStorageInfo(), "rightStorageInfo is null"); + List rightInputs = requireNonNull(rightTable.getInputSplits(), "rightInputs is null"); + checkArgument(rightInputs.size() > 0, "right table is empty"); + String[] rightColumnsToRead = rightTable.getColumnsToRead(); + int[] rightKeyColumnIds = rightTable.getKeyColumnIds(); + TableScanFilter rightFilter = JSON.parseObject(rightTable.getFilter(), TableScanFilter.class); + + String[] leftColAlias = input.getJoinInfo().getSmallColumnAlias(); + String[] rightColAlias = input.getJoinInfo().getLargeColumnAlias(); + boolean[] leftProjection = input.getJoinInfo().getSmallProjection(); + boolean[] rightProjection = input.getJoinInfo().getLargeProjection(); + JoinType joinType = input.getJoinInfo().getJoinType(); + checkArgument(joinType != JoinType.EQUI_LEFT && joinType != JoinType.EQUI_FULL, + "broadcast join can not be used for LEFT_OUTER or FULL_OUTER join"); + + MultiOutputInfo outputInfo = input.getOutput(); + StorageInfo outputStorageInfo = outputInfo.getStorageInfo(); + checkArgument(outputInfo.getFileNames().size() == 1, + "it is incorrect to have more than one output files"); + String outputFolder = outputInfo.getPath(); + if (!outputFolder.endsWith("/")) { + outputFolder += "/"; + } + boolean encoding = outputInfo.isEncoding(); + + StreamWorkerCommon.initStorage(leftInputStorageInfo); + StreamWorkerCommon.initStorage(rightInputStorageInfo); + StreamWorkerCommon.initStorage(outputStorageInfo); + + List rightEndpoints = Arrays.asList("http://localhost:18686/", "http://localhost:18687/", "http://localhost:18688/", "http://localhost:18689/"); + boolean partitionOutput = input.getJoinInfo().isPostPartition(); + PartitionInfo outputPartitionInfo = input.getJoinInfo().getPostPartitionInfo(); + if (partitionOutput) { + requireNonNull(outputPartitionInfo, "outputPartitionInfo is null"); + } + + // build the joiner + AtomicReference leftSchema = new AtomicReference<>(); + AtomicReference rightSchema = new AtomicReference<>(); + if (leftInputStorageInfo.getScheme() == Storage.Scheme.httpstream && rightInputStorageInfo.getScheme() == Storage.Scheme.httpstream) { + StreamWorkerCommon.getSchemaFromPaths(threadPool, + StreamWorkerCommon.getStorage(leftInputStorageInfo.getScheme()), + StreamWorkerCommon.getStorage(rightInputStorageInfo.getScheme()), + leftSchema, rightSchema, + Collections.singletonList("http://localhost:18688/"), + Collections.singletonList("http://localhost:18686/")); + } else if (leftInputStorageInfo.getScheme() != Storage.Scheme.httpstream && rightInputStorageInfo.getScheme() == Storage.Scheme.httpstream) { + StreamWorkerCommon.getSchemaFromTwoPaths(threadPool, + StreamWorkerCommon.getStorage(leftInputStorageInfo.getScheme()), + StreamWorkerCommon.getStorage(rightInputStorageInfo.getScheme()), + leftSchema, rightSchema, + leftInputs, + rightEndpoints); + } else if (leftInputStorageInfo.getScheme() != Storage.Scheme.httpstream && rightInputStorageInfo.getScheme() != Storage.Scheme.httpstream) { + WorkerCommon.getFileSchemaFromSplits(threadPool, + StreamWorkerCommon.getStorage(leftInputStorageInfo.getScheme()), + StreamWorkerCommon.getStorage(rightInputStorageInfo.getScheme()), + leftSchema, rightSchema, leftInputs, rightInputs); + } + Joiner joiner = new Joiner(joinType, + StreamWorkerCommon.getResultSchema(leftSchema.get(), leftColumnsToRead), + leftColAlias, leftProjection, leftKeyColumnIds, + StreamWorkerCommon.getResultSchema(rightSchema.get(), rightColumnsToRead), + rightColAlias, rightProjection, rightKeyColumnIds); + logger.info("succeed to get left and right schema"); + + // build the hash table for the left table. + List leftFutures = new ArrayList<>(); + for (InputSplit inputSplit : leftInputs) { + List inputs = new LinkedList<>(inputSplit.getInputInfos()); + leftFutures.add(threadPool.submit(() -> { + try { + buildHashTable(transId, joiner, inputs, leftInputStorageInfo.getScheme(), + !leftTable.isBase(), leftColumnsToRead, leftFilter, workerMetrics); + } catch (Throwable e) { + throw new WorkerException("error during hash table construction", e); + } + })); + } + for (Future future : leftFutures) { + future.get(); + } + logger.info("succeed to build hash table"); + + List> result = new ArrayList<>(); + if (partitionOutput) { + for (int i = 0; i < outputPartitionInfo.getNumPartition(); ++i) { + result.add(new ConcurrentLinkedQueue<>()); + } + } else { + result.add(new ConcurrentLinkedQueue<>()); + } + + // scan the right table and do the join. + if (joiner.getSmallTableSize() > 0) + { + if (rightInputStorageInfo.getScheme() == Storage.Scheme.httpstream) + { + logger.info("scan right input from http"); + for (String endpoint : rightEndpoints) + { + threadPool.execute(() -> { + try + { + if (partitionOutput) + { + throw new UnsupportedOperationException("don't support partitioning operation"); + } else + { + joinWithRightTable(transId, joiner, endpoint, rightInputStorageInfo.getScheme(), rightColumnsToRead, + rightFilter, result.get(0), workerMetrics, logger); + } + } catch (Throwable e) + { + throw new WorkerException("error during broadcast join", e); + } + }); + } + threadPool.shutdown(); + try + { + while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) ; + } catch (InterruptedException e) + { + throw new WorkerException("interrupted while waiting for the termination of join", e); + } + + if (exceptionHandler.hasException()) + { + throw new WorkerException("error occurred threads, please check the stacktrace before this log record"); + } + } else + { + logger.info("scan right input from non http"); + for (InputSplit inputSplit : rightInputs) + { + List inputs = new LinkedList<>(inputSplit.getInputInfos()); + threadPool.execute(() -> { + try + { + int numJoinedRows = partitionOutput ? + joinWithRightTableAndPartition( + transId, joiner, inputs, rightInputStorageInfo.getScheme(), + !rightTable.isBase(), rightColumnsToRead, rightFilter, + outputPartitionInfo, result, workerMetrics) : + joinWithRightTable(transId, joiner, inputs, rightInputStorageInfo.getScheme(), + !rightTable.isBase(), rightColumnsToRead, rightFilter, result.get(0), workerMetrics); + } catch (Throwable e) + { + throw new WorkerException("error during broadcast join", e); + } + }); + } + threadPool.shutdown(); + try + { + while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) ; + } catch (InterruptedException e) + { + throw new WorkerException("interrupted while waiting for the termination of join", e); + } + + if (exceptionHandler.hasException()) + { + throw new WorkerException("error occurred threads, please check the stacktrace before this log record"); + } + } + } + logger.info("succeed to join with right table"); + + String outputPath = outputFolder + outputInfo.getFileNames().get(0); + List downStreamWorkers = workerCoordinatorService.getDownstreamWorkers(worker.getWorkerId()) + .stream() + .sorted(Comparator.comparing(worker -> worker.getHashValues().get(0))) + .collect(ImmutableList.toImmutableList()); + List outputEndpoints = downStreamWorkers.stream() + .map(CFWorkerInfo::getIp) + .map(ip -> "http://" + ip + ":" + + (18686 + worker.getWorkerPortIndex()) + "/") + // .map(URI::create) + .collect(Collectors.toList()); + + logger.info("down stream workers are {}", downStreamWorkers); + if (!downStreamWorkers.isEmpty()) { + StreamWorkerCommon.passSchemaToNextLevel(joiner.getJoinedSchema(), outputStorageInfo, + outputEndpoints); + logger.info("succeed to pass schema to next level"); + try { + WorkerMetrics.Timer writeCostTimer = new WorkerMetrics.Timer().start(); + PixelsWriter pixelsWriter; + if (partitionOutput) { + pixelsWriter = StreamWorkerCommon.getWriter(joiner.getJoinedSchema(), + StreamWorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, encoding, + true, 0, + Arrays.stream(leftKeyColumnIds).boxed().collect(Collectors.toList()), + outputEndpoints, false); + for (int hash = 0; hash < outputPartitionInfo.getNumPartition(); ++hash) { + ConcurrentLinkedQueue batches = result.get(hash); + if (!batches.isEmpty()) { + for (VectorizedRowBatch batch : batches) { + pixelsWriter.addRowBatch(batch, hash); + } + } + } + } else { + pixelsWriter = StreamWorkerCommon.getWriter(joiner.getJoinedSchema(), + StreamWorkerCommon.getStorage(outputStorageInfo.getScheme()), + outputEndpoints.get(0), encoding, + false, 0, + null, null, false); + ConcurrentLinkedQueue rowBatches = result.get(0); + for (VectorizedRowBatch rowBatch : rowBatches) { + pixelsWriter.addRowBatch(rowBatch); + } + } + pixelsWriter.close(); + logger.info("succeed to write results"); + joinOutput.addOutput(outputPath, pixelsWriter.getNumRowGroup()); + if (outputStorageInfo.getScheme() == Storage.Scheme.minio) { + while (!WorkerCommon.getStorage(Storage.Scheme.minio).exists(outputPath)) { + // Wait for 10ms and see if the output file is visible. + TimeUnit.MILLISECONDS.sleep(10); + } + } + workerMetrics.addOutputCostNs(writeCostTimer.stop()); + workerMetrics.addWriteBytes(pixelsWriter.getCompletedBytes()); + workerMetrics.addNumWriteRequests(pixelsWriter.getNumWriteRequests()); + } catch (Throwable e) { + throw new WorkerException( + "failed to finish writing and close the join result file '" + outputPath + "'", e); + } + } else { + try { + PixelsWriter pixelsWriter; + WorkerMetrics.Timer writeCostTimer = new WorkerMetrics.Timer().start(); + if (partitionOutput) { + pixelsWriter = WorkerCommon.getWriter(joiner.getJoinedSchema(), + WorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, + encoding, true, Arrays.stream( + outputPartitionInfo.getKeyColumnIds()).boxed(). + collect(Collectors.toList())); + for (int hash = 0; hash < outputPartitionInfo.getNumPartition(); ++hash) { + ConcurrentLinkedQueue batches = result.get(hash); + if (!batches.isEmpty()) { + for (VectorizedRowBatch batch : batches) { + pixelsWriter.addRowBatch(batch, hash); + } + } + } + } else { + pixelsWriter = WorkerCommon.getWriter(joiner.getJoinedSchema(), + WorkerCommon.getStorage(outputStorageInfo.getScheme()), outputPath, + encoding, false, null); + ConcurrentLinkedQueue rowBatches = result.get(0); + for (VectorizedRowBatch rowBatch : rowBatches) { + pixelsWriter.addRowBatch(rowBatch); + } + } + pixelsWriter.close(); + joinOutput.addOutput(outputPath, pixelsWriter.getNumRowGroup()); + if (outputStorageInfo.getScheme() == Storage.Scheme.minio) { + while (!WorkerCommon.getStorage(Storage.Scheme.minio).exists(outputPath)) { + // Wait for 10ms and see if the output file is visible. + TimeUnit.MILLISECONDS.sleep(10); + } + } + workerMetrics.addOutputCostNs(writeCostTimer.stop()); + workerMetrics.addWriteBytes(pixelsWriter.getCompletedBytes()); + workerMetrics.addNumWriteRequests(pixelsWriter.getNumWriteRequests()); + } catch (Throwable e) { + throw new WorkerException( + "failed to finish writing and close the join result file '" + outputPath + "'", e); + } + } + joinOutput.setDurationMs((int) (System.currentTimeMillis() - startTime)); + WorkerCommon.setPerfMetrics(joinOutput, workerMetrics); + return joinOutput; + } catch (Throwable e) { + logger.error("error during join", e); + joinOutput.setSuccessful(false); + joinOutput.setErrorMessage(e.getMessage()); + joinOutput.setDurationMs((int) (System.currentTimeMillis() - startTime)); + return joinOutput; + } + } + + /** + * Scan the input files of the right table and do the join. + * + * @param transId the transaction id used by I/O scheduler + * @param joiner the joiner for the broadcast join + * @param rightScheme the storage scheme of the right table + * @param rightCols the column names of the right table + * @param rightFilter the table scan filter on the right table + * @param joinResult the container of the join result + * @param workerMetrics the collector of the performance metrics + * @return the number of joined rows produced in this split + */ + public static int joinWithRightTable( + long transId, Joiner joiner, String rightEndpoint, + Storage.Scheme rightScheme, String[] rightCols, TableScanFilter rightFilter, + ConcurrentLinkedQueue joinResult, WorkerMetrics workerMetrics, Logger logger) { + logger.info("join with right table endpoint {}", rightEndpoint); + int joinedRows = 0; + WorkerMetrics.Timer readCostTimer = new WorkerMetrics.Timer(); + WorkerMetrics.Timer computeCostTimer = new WorkerMetrics.Timer(); + long readBytes = 0L; + int numReadRequests = 0; + + readCostTimer.start(); + PixelsReader pixelsReader; + try + { + pixelsReader = StreamWorkerCommon.getReader(rightScheme, rightEndpoint); + readCostTimer.stop(); + PixelsReaderOption option = StreamWorkerCommon.getReaderOption(transId, rightCols); + PixelsRecordReader recordReader = pixelsReader.read(option); + VectorizedRowBatch rowBatch; + + Bitmap filtered = new Bitmap(StreamWorkerCommon.rowBatchSize, true); + Bitmap tmp = new Bitmap(StreamWorkerCommon.rowBatchSize, false); + computeCostTimer.start(); + do + { + rowBatch = recordReader.readBatch(StreamWorkerCommon.rowBatchSize); +// logger.info("record reader read row batch size before filter {}", rowBatch.size); + rightFilter.doFilter(rowBatch, filtered, tmp); + rowBatch.applyFilter(filtered); +// logger.info("record reader read row batch size after filter {}", rowBatch.size); + if (rowBatch.size > 0) { + logger.info("row batch size > 0"); + List joinedBatches = joiner.join(rowBatch); + for (VectorizedRowBatch joined : joinedBatches) { + if (!joined.isEmpty()) { + logger.info("joined result add {}", joined.size); + joinResult.add(joined); + joinedRows += joined.size; + } + } + } + } while (!rowBatch.endOfFile); + pixelsReader.close(); + computeCostTimer.stop(); + computeCostTimer.minus(recordReader.getReadTimeNanos()); + readCostTimer.add(recordReader.getReadTimeNanos()); + readBytes += recordReader.getCompletedBytes(); + numReadRequests += recordReader.getNumReadRequests(); + } catch (Throwable e) { + throw new WorkerException("failed to scan the right table input file '" + + rightEndpoint + "' and do the join", e); + } + workerMetrics.addReadBytes(readBytes); + workerMetrics.addNumReadRequests(numReadRequests); + workerMetrics.addInputCostNs(readCostTimer.getElapsedNs()); + workerMetrics.addComputeCostNs(computeCostTimer.getElapsedNs()); + return joinedRows; + } +} \ No newline at end of file diff --git a/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/WorkerServiceImpl.java b/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/WorkerServiceImpl.java index fd86b61c7..2f17dabde 100644 --- a/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/WorkerServiceImpl.java +++ b/pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/WorkerServiceImpl.java @@ -58,6 +58,12 @@ public void process(TurboProto.WorkerRequest request, StreamObserver service = new ServiceImpl<>(BroadcastJoinStreamWorker.class, BroadcastJoinInput.class); + service.execute(request, responseObserver); + break; + } case PARTITIONED_CHAIN_JOIN: { ServiceImpl service = new ServiceImpl<>(PartitionedChainJoinWorker.class, PartitionedChainJoinInput.class); diff --git a/proto/turbo.proto b/proto/turbo.proto index 6505d301d..51dc033a9 100644 --- a/proto/turbo.proto +++ b/proto/turbo.proto @@ -106,9 +106,10 @@ message RegisterWorkerRequest { message RegisterWorkerResponse { int32 errorCode = 1; - int64 workerId = 2; // the unique id assigned by the coordinator for this worker - int64 leaseStartTimeMs = 3; // the time since the epoch in milliseconds of the lease - int64 leasePeriodMs = 4; // the valid period in milliseconds of the lease + int32 workerPortIndex = 2; // the index of port this worker will write to down stream worker + int64 workerId = 3; // the unique id assigned by the coordinator for this worker + int64 leaseStartTimeMs = 4; // the time since the epoch in milliseconds of the lease + int64 leasePeriodMs = 5; // the valid period in milliseconds of the lease } message GetDownstreamWorkersRequest {