Skip to content

Commit

Permalink
[native] Add caching of parsed Types
Browse files Browse the repository at this point in the history
We've seen cases of queries that spend a large amount of time just parsing
types when converting the Presto Plan to Velox. This seems to be because it
parses the same large Row Types that are used across many field accesses.

Adding caching within a request shows a substantial decrease in the amount of
time it takes to do the conversion.

Notably, this helps with timeouts we're seeing making calls from the coordinator
to create tasks on the Workers.
  • Loading branch information
kevinwilfong authored and xiaoxmeng committed Nov 13, 2023
1 parent 6c48ec5 commit d1c5d83
Show file tree
Hide file tree
Showing 17 changed files with 272 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
#include <folly/Uri.h>
#include <folly/init/Init.h>
#include <folly/json.h>
#include "presto_cpp/main/types/ParseTypeSignature.h"
#include "velox/common/base/Fs.h"
#include "velox/common/encode/Base64.h"
#include "velox/common/file/FileSystems.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/serializers/PrestoSerializer.h"

// ANTLR defines an INVALID_INDEX macro, and DuckDB has a constant variable of
// the same name. So we have to include TypeParser.h after Velox.
// clang-format off
#include "presto_cpp/main/types/TypeParser.h"
// clang-format on

using namespace facebook::velox;

namespace facebook::presto::test {
Expand Down Expand Up @@ -122,9 +127,10 @@ class ServerResponse {

std::vector<std::string> names;
std::vector<TypePtr> types;
TypeParser parser;
for (const auto& column : response_["columns"]) {
names.push_back(column["name"].asString());
types.push_back(parseTypeSignature(column["type"].asString()));
types.push_back(parser.parse(column["type"].asString()));
}

auto rowType = ROW(std::move(names), std::move(types));
Expand Down
5 changes: 2 additions & 3 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
add_library(
presto_type_converter OBJECT
TypeSignatureTypeConverter.cpp antlr/TypeSignatureLexer.cpp
antlr/TypeSignatureParser.cpp)
presto_type_converter OBJECT TypeParser.cpp antlr/TypeSignatureLexer.cpp
antlr/TypeSignatureParser.cpp)

target_link_libraries(presto_type_converter velox_type)

Expand Down
45 changes: 24 additions & 21 deletions presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include <boost/algorithm/string/case_conv.hpp>
#include "presto_cpp/main/types/ParseTypeSignature.h"
#include "presto_cpp/presto_protocol/Base64Util.h"
#include "velox/common/base/Exceptions.h"
#include "velox/functions/prestosql/types/JsonType.h"
Expand Down Expand Up @@ -170,7 +169,8 @@ namespace {
std::optional<TypedExprPtr> tryConvertCast(
const protocol::Signature& signature,
const std::string& returnType,
const std::vector<TypedExprPtr>& args) {
const std::vector<TypedExprPtr>& args,
const TypeParser* typeParser) {
static const char* kCast = "presto.default.$operator$cast";
static const char* kTryCast = "presto.default.try_cast";
static const char* kJsonToArrayCast =
Expand All @@ -197,7 +197,7 @@ std::optional<TypedExprPtr> tryConvertCast(
signature.name.compare(kJsonToArrayCast) == 0 ||
signature.name.compare(kJsonToMapCast) == 0 ||
signature.name.compare(kJsonToRowCast) == 0) {
auto type = parseTypeSignature(returnType);
auto type = typeParser->parse(returnType);
return std::make_shared<CastTypedExpr>(
type,
std::vector<TypedExprPtr>{std::make_shared<CallTypedExpr>(
Expand All @@ -219,14 +219,15 @@ std::optional<TypedExprPtr> tryConvertCast(
return args[0];
}

auto type = parseTypeSignature(returnType);
auto type = typeParser->parse(returnType);
return std::make_shared<CastTypedExpr>(type, args, nullOnFailure);
}

std::optional<TypedExprPtr> tryConvertTry(
const protocol::Signature& signature,
const std::string& returnType,
const std::vector<TypedExprPtr>& args) {
const std::vector<TypedExprPtr>& args,
const TypeParser* typeParser) {
static const char* kTry = "presto.default.$internal$try";

if (signature.kind != protocol::FunctionKind::SCALAR) {
Expand All @@ -243,7 +244,7 @@ std::optional<TypedExprPtr> tryConvertTry(
VELOX_CHECK(lambda);
VELOX_CHECK_EQ(lambda->signature()->size(), 0);

auto type = parseTypeSignature(returnType);
auto type = typeParser->parse(returnType);
std::vector<TypedExprPtr> newArgs = {lambda->body()};
return std::make_shared<CallTypedExpr>(type, newArgs, "try");
}
Expand All @@ -252,7 +253,8 @@ std::optional<TypedExprPtr> tryConvertLiteralArray(
const protocol::Signature& signature,
const std::string& returnType,
const std::vector<TypedExprPtr>& args,
velox::memory::MemoryPool* pool) {
velox::memory::MemoryPool* pool,
const TypeParser* typeParser) {
static const char* kLiteralArray = "presto.default.$literal$array";
static const char* kFromBase64 = "presto.default.from_base64";

Expand All @@ -272,7 +274,7 @@ std::optional<TypedExprPtr> tryConvertLiteralArray(
return std::nullopt;
}

auto type = parseTypeSignature(returnType);
auto type = typeParser->parse(returnType);

auto encoded =
std::dynamic_pointer_cast<const ConstantTypedExpr>(call->inputs()[0]);
Expand Down Expand Up @@ -310,7 +312,7 @@ std::optional<TypedExprPtr> VeloxExprConverter::tryConvertDate(
// a VARCHAR or TIMESTAMP (with an optional timezone) type.
args.emplace_back(toVeloxExpr(pexpr.arguments[0]));

auto returnType = parseTypeSignature(pexpr.returnType);
auto returnType = typeParser_->parse(pexpr.returnType);
return std::make_shared<CastTypedExpr>(returnType, args, false);
}

Expand Down Expand Up @@ -360,7 +362,7 @@ std::optional<TypedExprPtr> VeloxExprConverter::tryConvertLike(
}

// Construct the returnType and CallTypedExpr for 'like'
auto returnType = parseTypeSignature(pexpr.returnType);
auto returnType = typeParser_->parse(pexpr.returnType);
return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(signature));
}
Expand All @@ -384,23 +386,24 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
auto args = toVeloxExpr(pexpr.arguments);
auto signature = builtin->signature;

auto cast = tryConvertCast(signature, pexpr.returnType, args);
auto cast = tryConvertCast(signature, pexpr.returnType, args, typeParser_);
if (cast.has_value()) {
return cast.value();
}

auto tryExpr = tryConvertTry(signature, pexpr.returnType, args);
auto tryExpr =
tryConvertTry(signature, pexpr.returnType, args, typeParser_);
if (tryExpr.has_value()) {
return tryExpr.value();
}

auto literal =
tryConvertLiteralArray(signature, pexpr.returnType, args, pool_);
auto literal = tryConvertLiteralArray(
signature, pexpr.returnType, args, pool_, typeParser_);
if (literal.has_value()) {
return literal.value();
}

auto returnType = parseTypeSignature(pexpr.returnType);
auto returnType = typeParser_->parse(pexpr.returnType);
return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(signature));

Expand All @@ -409,7 +412,7 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
std::dynamic_pointer_cast<protocol::SqlFunctionHandle>(
pexpr.functionHandle)) {
auto args = toVeloxExpr(pexpr.arguments);
auto returnType = parseTypeSignature(pexpr.returnType);
auto returnType = typeParser_->parse(pexpr.returnType);
return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(sqlFunctionHandle->functionId));
}
Expand All @@ -419,7 +422,7 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(

std::shared_ptr<const ConstantTypedExpr> VeloxExprConverter::toVeloxExpr(
std::shared_ptr<protocol::ConstantExpression> pexpr) const {
const auto type = parseTypeSignature(pexpr->type);
const auto type = typeParser_->parse(pexpr->type);
switch (type->kind()) {
case TypeKind::ROW:
FOLLY_FALLTHROUGH;
Expand Down Expand Up @@ -676,7 +679,7 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
return convertInExpr(args, pool_);
}

auto returnType = parseTypeSignature(pexpr->returnType);
auto returnType = typeParser_->parse(pexpr->returnType);

if (pexpr->form == protocol::Form::SWITCH) {
return convertSwitchExpr(returnType, std::move(args));
Expand All @@ -703,15 +706,15 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
std::shared_ptr<const FieldAccessTypedExpr> VeloxExprConverter::toVeloxExpr(
std::shared_ptr<protocol::VariableReferenceExpression> pexpr) const {
return std::make_shared<FieldAccessTypedExpr>(
parseTypeSignature(pexpr->type), pexpr->name);
typeParser_->parse(pexpr->type), pexpr->name);
}

std::shared_ptr<const LambdaTypedExpr> VeloxExprConverter::toVeloxExpr(
std::shared_ptr<protocol::LambdaDefinitionExpression> lambda) const {
std::vector<velox::TypePtr> argumentTypes;
argumentTypes.reserve(lambda->argumentTypes.size());
for (auto& typeName : lambda->argumentTypes) {
argumentTypes.emplace_back(parseTypeSignature(typeName));
argumentTypes.emplace_back(typeParser_->parse(typeName));
}

// TODO(spershin): In some cases we can visit this method with the same lambda
Expand All @@ -728,7 +731,7 @@ std::shared_ptr<const LambdaTypedExpr> VeloxExprConverter::toVeloxExpr(
std::shared_ptr<const FieldAccessTypedExpr> VeloxExprConverter::toVeloxExpr(
const protocol::VariableReferenceExpression& pexpr) const {
return std::make_shared<FieldAccessTypedExpr>(
parseTypeSignature(pexpr.type), pexpr.name);
typeParser_->parse(pexpr.type), pexpr.name);
}

TypedExprPtr VeloxExprConverter::toVeloxExpr(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
#pragma once

#include <stdexcept>
// antlr-common.h undefines the EOF macro that external/json/nlohmann/json.hpp
// relies on, so include presto_protcol.h before TypeParser.h
// clang-format off
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "presto_cpp/main/types/TypeParser.h"
// clang-format on
#include "velox/core/Expressions.h"

namespace facebook::presto {

class VeloxExprConverter {
public:
explicit VeloxExprConverter(velox::memory::MemoryPool* pool) : pool_(pool) {}
VeloxExprConverter(velox::memory::MemoryPool* pool, TypeParser* typeParser)
: pool_(pool), typeParser_(typeParser) {}

std::shared_ptr<const velox::core::ConstantTypedExpr> toVeloxExpr(
std::shared_ptr<protocol::ConstantExpression> pexpr) const;
Expand Down Expand Up @@ -60,7 +66,8 @@ class VeloxExprConverter {
std::optional<velox::core::TypedExprPtr> tryConvertDate(
const protocol::CallExpression& pexpr) const;

velox::memory::MemoryPool* pool_;
velox::memory::MemoryPool* const pool_;
TypeParser* const typeParser_;
};

} // namespace facebook::presto
Loading

0 comments on commit d1c5d83

Please sign in to comment.