Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TableWriterRepalyer #11100

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2319,6 +2319,18 @@ folly::dynamic PlanNode::serialize() const {
return obj;
}

const std::vector<PlanNodePtr>& QueryTraceScanNode::sources() const {
return kEmptySources;
}

std::string QueryTraceScanNode::traceDir() const {
return traceDir_;
}

void QueryTraceScanNode::addDetails(std::stringstream& stream) const {
stream << "Trace dir: " << traceDir_;
}

folly::dynamic FilterNode::serialize() const {
auto obj = PlanNode::serialize();
obj["filter"] = filter_->serialize();
Expand Down
35 changes: 35 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <fmt/format.h>

#include <utility>

#include "velox/connectors/Connector.h"
#include "velox/core/Expressions.h"
#include "velox/core/QueryConfig.h"
Expand Down Expand Up @@ -312,6 +314,39 @@ class ArrowStreamNode : public PlanNode {
std::shared_ptr<ArrowArrayStream> arrowStream_;
};

class QueryTraceScanNode final : public PlanNode {
public:
QueryTraceScanNode(
const PlanNodeId& id,
const std::string& traceDir,
const RowTypePtr& outputType)
: PlanNode(id), traceDir_(traceDir), outputType_(outputType) {}

const RowTypePtr& outputType() const override {
return outputType_;
}

const std::vector<PlanNodePtr>& sources() const override;

std::string_view name() const override {
return "QueryReplayScan";
}

folly::dynamic serialize() const override {
VELOX_UNSUPPORTED("QueryReplayScanNode is not serializable");
return nullptr;
}

std::string traceDir() const;

private:
void addDetails(std::stringstream& stream) const override;

// Directory of traced data, which is $traceRoot/$taskId/$nodeId.
const std::string traceDir_;
const RowTypePtr outputType_;
};

class FilterNode : public PlanNode {
public:
FilterNode(const PlanNodeId& id, TypedExprPtr filter, PlanNodePtr source)
Expand Down
7 changes: 7 additions & 0 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "velox/exec/Unnest.h"
#include "velox/exec/Values.h"
#include "velox/exec/Window.h"
#include "velox/exec/trace/QueryTraceScan.h"

namespace facebook::velox::exec {

Expand Down Expand Up @@ -587,6 +588,12 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
assignUniqueIdNode,
assignUniqueIdNode->taskUniqueId(),
assignUniqueIdNode->uniqueIdCounter()));
} else if (
const auto queryReplayScanNode =
std::dynamic_pointer_cast<const core::QueryTraceScanNode>(
planNode)) {
operators.push_back(std::make_unique<trace::QueryTraceScan>(
id, ctx.get(), queryReplayScanNode));
} else {
std::unique_ptr<Operator> extended;
if (planNode->requiresExchangeClient()) {
Expand Down
12 changes: 12 additions & 0 deletions velox/exec/tests/utils/AssertQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ AssertQueryBuilder& AssertQueryBuilder::connectorSessionProperty(
return *this;
}

AssertQueryBuilder& AssertQueryBuilder::connectorSessionProperties(
const std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>& properties) {
for (const auto& [connectorId, values] : properties) {
for (const auto& [key, value] : values) {
connectorSessionProperty(connectorId, key, value);
}
}
return *this;
}

AssertQueryBuilder& AssertQueryBuilder::split(Split split) {
this->split(getOnlyLeafPlanNodeId(params_.planNode), std::move(split));
return *this;
Expand Down
5 changes: 5 additions & 0 deletions velox/exec/tests/utils/AssertQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class AssertQueryBuilder {
const std::string& key,
const std::string& value);

AssertQueryBuilder& connectorSessionProperties(
const std::unordered_map<
std::string,
std::unordered_map<std::string, std::string>>& properties);

// Methods to add splits.

/// Add a single split for the specified plan node.
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ PlanBuilder& PlanBuilder::values(
return *this;
}

PlanBuilder& PlanBuilder::traceScan(
const std::string& traceNodeDir,
const RowTypePtr& outputType) {
planNode_ = std::make_shared<core::QueryTraceScanNode>(
nextPlanNodeId(), traceNodeDir, outputType);
return *this;
}

PlanBuilder& PlanBuilder::exchange(const RowTypePtr& outputType) {
VELOX_CHECK_NULL(planNode_, "Exchange must be the source node");
planNode_ =
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ class PlanBuilder {
bool parallelizable = false,
size_t repeatTimes = 1);

/// Adds a QueryReplayNode for query tracing.
///
/// @param traceNodeDir The trace directory for a given plan node.
/// @param outputType The type of the tracing data.
PlanBuilder& traceScan(
const std::string& traceNodeDir,
const RowTypePtr& outputType);

/// Add an ExchangeNode.
///
/// Use capturePlanNodeId method to capture the node ID needed for adding
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ velox_add_library(
velox_query_trace_exec
QueryMetadataWriter.cpp
QueryTraceConfig.cpp
QueryDataReader.cpp
QueryDataWriter.cpp
QueryTraceScan.cpp
QueryTraceUtil.cpp)

velox_link_libraries(
Expand Down
24 changes: 10 additions & 14 deletions velox/exec/trace/QueryDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,23 @@
* limitations under the License.
*/

#include <utility>

#include "velox/exec/trace/QueryDataReader.h"

#include "velox/common/file/File.h"
#include "velox/exec/trace/QueryTraceTraits.h"

namespace facebook::velox::exec::trace {

QueryDataReader::QueryDataReader(std::string path, memory::MemoryPool* pool)
: path_(std::move(path)),
fs_(filesystems::getFileSystem(path_, nullptr)),
QueryDataReader::QueryDataReader(
std::string traceDir,
RowTypePtr dataType,
memory::MemoryPool* pool)
: traceDir_(std::move(traceDir)),
fs_(filesystems::getFileSystem(traceDir_, nullptr)),
dataType_(std::move(dataType)),
pool_(pool),
dataType_(getTraceDataType()),
dataStream_(getDataInputStream()) {
VELOX_CHECK_NOT_NULL(dataType_);
VELOX_CHECK_NOT_NULL(dataStream_);
Expand All @@ -42,19 +47,10 @@ bool QueryDataReader::read(RowVectorPtr& batch) const {
return true;
}

RowTypePtr QueryDataReader::getTraceDataType() const {
const auto summaryFile = fs_->openFileForRead(
fmt::format("{}/{}", path_, QueryTraceTraits::kDataSummaryFileName));
const auto summary = summaryFile->pread(0, summaryFile->size());
VELOX_USER_CHECK(!summary.empty());
folly::dynamic obj = folly::parseJson(summary);
return ISerializable::deserialize<RowType>(obj["rowType"]);
}

std::unique_ptr<common::FileInputStream> QueryDataReader::getDataInputStream()
const {
auto dataFile = fs_->openFileForRead(
fmt::format("{}/{}", path_, QueryTraceTraits::kDataFileName));
fmt::format("{}/{}", traceDir_, QueryTraceTraits::kDataFileName));
// TODO: Make the buffer size configurable.
return std::make_unique<common::FileInputStream>(
std::move(dataFile), 1 << 20, pool_);
Expand Down
11 changes: 6 additions & 5 deletions velox/exec/trace/QueryDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,26 @@ namespace facebook::velox::exec::trace {

class QueryDataReader {
public:
explicit QueryDataReader(std::string path, memory::MemoryPool* pool);
explicit QueryDataReader(
std::string traceDir,
RowTypePtr dataType,
memory::MemoryPool* pool);

/// Reads from 'dataStream_' and deserializes to 'batch'. Returns false if
/// reaches to end of the stream and 'batch' is set to nullptr.
bool read(RowVectorPtr& batch) const;

private:
RowTypePtr getTraceDataType() const;

std::unique_ptr<common::FileInputStream> getDataInputStream() const;

const std::string path_;
const std::string traceDir_;
const serializer::presto::PrestoVectorSerde::PrestoOptions readOptions_{
true,
common::CompressionKind_ZSTD, // TODO: Use trace config.
/*nullsFirst=*/true};
const std::shared_ptr<filesystems::FileSystem> fs_;
memory::MemoryPool* const pool_;
const RowTypePtr dataType_;
memory::MemoryPool* const pool_;
const std::unique_ptr<common::FileInputStream> dataStream_;
};
} // namespace facebook::velox::exec::trace
56 changes: 56 additions & 0 deletions velox/exec/trace/QueryTraceScan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/exec/trace/QueryTraceScan.h"

#include "QueryTraceUtil.h"

namespace facebook::velox::exec::trace {

QueryTraceScan::QueryTraceScan(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::QueryTraceScanNode>& queryTraceScanNode)
: SourceOperator(
driverCtx,
queryTraceScanNode->outputType(),
operatorId,
queryTraceScanNode->id(),
"QueryReplayScan") {
const auto dataDir = getDataDir(
queryTraceScanNode->traceDir(),
driverCtx->pipelineId,
driverCtx->driverId);
traceReader_ = std::make_unique<QueryDataReader>(
dataDir,
queryTraceScanNode->outputType(),
memory::MemoryManager::getInstance()->tracePool());
}

RowVectorPtr QueryTraceScan::getOutput() {
RowVectorPtr batch;
if (traceReader_->read(batch)) {
return batch;
}
finished_ = true;
return nullptr;
}

bool QueryTraceScan::isFinished() {
return finished_;
}

} // namespace facebook::velox::exec::trace
62 changes: 62 additions & 0 deletions velox/exec/trace/QueryTraceScan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "velox/core/PlanNode.h"
#include "velox/exec/Operator.h"
#include "velox/exec/trace/QueryDataReader.h"

namespace facebook::velox::exec::trace {
/// This is a scan operator for query replay. It uses traced data from a
/// specific directory path, which is
/// $traceRoot/$taskId/$nodeId/$pipelineId/$driverId.
///
/// A plan node can be split into multiple pipelines, and each pipeline can be
/// divided into multiple operators. Each operator corresponds to a driver,
/// which is a thread of execution. Pipeline IDs and driver IDs are sequential
/// numbers starting from zero.
///
/// For a single plan node, there can be multiple traced data files. To find the
/// right input data file for replaying, we need to use both the pipeline ID and
/// driver ID.
///
/// The trace data directory up to the $nodeId, which is $root/$taskId/$nodeId.
/// It can be found from the QueryReplayScanNode. However the pipeline ID and
/// driver ID are only known during operator creation, so we need to figure out
/// the input traced data file and the output type dynamically.
class QueryTraceScan final : public SourceOperator {
public:
QueryTraceScan(
int32_t operatorId,
DriverCtx* driverCtx,
const std::shared_ptr<const core::QueryTraceScanNode>&
queryTraceScanNode);

RowVectorPtr getOutput() override;

BlockingReason isBlocked(ContinueFuture* /* unused */) override {
return BlockingReason::kNotBlocked;
}

bool isFinished() override;

private:
std::unique_ptr<QueryDataReader> traceReader_;
bool finished_{false};
};

} // namespace facebook::velox::exec::trace
Loading
Loading