Skip to content

Commit

Permalink
Add support for REST based remote functions
Browse files Browse the repository at this point in the history
  • Loading branch information
wills-feng authored and Joe-Abraham committed Sep 2, 2024
1 parent 31b2995 commit 433fc9a
Show file tree
Hide file tree
Showing 15 changed files with 895 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ option(VELOX_ENABLE_ABFS "Build Abfs Connector" OFF)
option(VELOX_ENABLE_HDFS "Build Hdfs Connector" OFF)
option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF)
option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" ON)
option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON)

option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF)
Expand Down
7 changes: 3 additions & 4 deletions velox/common/config/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if (${VELOX_BUILD_TESTING})
if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
endif ()
endif()

velox_add_library(velox_common_config Config.cpp)
velox_link_libraries(
velox_common_config
PUBLIC velox_common_base
velox_exception
PUBLIC velox_common_base velox_exception
PRIVATE re2::re2)
9 changes: 9 additions & 0 deletions velox/functions/remote/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if(NOT DEFINED PROXYGEN_LIBRARIES)
find_package(Sodium REQUIRED)
find_library(PROXYGEN proxygen)
find_library(PROXYGEN_HTTP_SERVER proxygenhttpserver)
find_library(FIZZ fizz)
find_library(WANGLE wangle)
set(PROXYGEN_LIBRARIES ${PROXYGEN_HTTP_SERVER} ${PROXYGEN} ${WANGLE} ${FIZZ})
endif()

add_subdirectory(if)
add_subdirectory(client)
add_subdirectory(server)
5 changes: 5 additions & 0 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp)
velox_link_libraries(velox_functions_remote_thrift_client
PUBLIC remote_function_thrift FBThrift::thriftcpp2)

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client ${PROXYGEN_LIBRARIES}
Folly::folly)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_expression
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
Expand Down
97 changes: 93 additions & 4 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <folly/io/async/EventBase.h>
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
Expand All @@ -33,17 +34,34 @@ std::string serializeType(const TypePtr& type) {
return type::fbhive::HiveTypeSerializer::serialize(type);
}

std::string iobufToString(const folly::IOBuf& buf) {
std::string result;
result.reserve(buf.computeChainDataLength());

for (auto range : buf) {
result.append(reinterpret_cast<const char*>(range.data()), range.size());
}

return result;
}

class RemoteFunction : public exec::VectorFunction {
public:
RemoteFunction(
const std::string& functionName,
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
location_(metadata.location),
thriftClient_(getThriftClient(location_, &eventBase_)),
serdeFormat_(metadata.serdeFormat),
serde_(getSerde(serdeFormat_)) {
if (metadata.location.type() == typeid(SocketAddress)) {
location_ = boost::get<SocketAddress>(metadata.location);
thriftClient_ = getThriftClient(location_, &eventBase_);
} else if (metadata.location.type() == typeid(URL)) {
url_ = boost::get<URL>(metadata.location);
restClient_ = std::make_unique<RestClient>(url_.getUrl());
}

std::vector<TypePtr> types;
types.reserve(inputArgs.size());
serializedInputTypes_.reserve(inputArgs.size());
Expand All @@ -62,7 +80,11 @@ class RemoteFunction : public exec::VectorFunction {
exec::EvalCtx& context,
VectorPtr& result) const override {
try {
applyRemote(rows, args, outputType, context, result);
if (thriftClient_) {
applyRemote(rows, args, outputType, context, result);
} else if (restClient_) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
throw;
} catch (const std::exception&) {
Expand All @@ -71,6 +93,69 @@ class RemoteFunction : public exec::VectorFunction {
}

private:
void applyRestRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const {
try {
std::string responseBody;
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

/// construct json request
folly::dynamic remoteFunctionHandle = folly::dynamic::object;
remoteFunctionHandle["functionName"] = functionName_;
remoteFunctionHandle["returnType"] = serializeType(outputType);
remoteFunctionHandle["argumentTypes"] = folly::dynamic::array;
for (const auto& value : serializedInputTypes_) {
remoteFunctionHandle["argumentTypes"].push_back(value);
}

folly::dynamic inputs = folly::dynamic::object;
inputs["pageFormat"] = static_cast<int>(serdeFormat_);
// use existing serializer(Prestopage or Sparkunsaferow)
inputs["payload"] = iobufToString(rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), serde_.get()));
inputs["rowCount"] = remoteRowVector->size();

folly::dynamic jsonObject = folly::dynamic::object;
jsonObject["remoteFunctionHandle"] = remoteFunctionHandle;
jsonObject["inputs"] = inputs;
jsonObject["throwOnError"] = context.throwOnError();

// call Rest client to send request
restClient_->invoke_function(folly::toJson(jsonObject), responseBody);
LOG(INFO) << responseBody;

// parse json response
auto responseJsonObj = parseJson(responseBody);
if (responseJsonObj.count("err") > 0) {
VELOX_NYI(responseJsonObj["err"].asString());
}

auto payloadIObuf = folly::IOBuf::copyBuffer(
responseJsonObj["result"]["payload"].asString());

// use existing deserializer(Prestopage or Sparkunsaferow)
auto outputRowVector = IOBufToRowVector(
*payloadIObuf, ROW({outputType}), *context.pool(), serde_.get());
result = outputRowVector->childAt(0);

} catch (const std::exception& e) {
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
functionName_,
url_.getUrl(),
e.what());
}
}

void applyRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
Expand Down Expand Up @@ -122,10 +207,14 @@ class RemoteFunction : public exec::VectorFunction {
}

const std::string functionName_;
folly::SocketAddress location_;

folly::EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;
folly::SocketAddress location_;

std::unique_ptr<RestClient> restClient_;
proxygen::URL url_;

remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde> serde_;

Expand Down
9 changes: 6 additions & 3 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@

#pragma once

#include <boost/variant.hpp>
#include <folly/SocketAddress.h>
#include <proxygen/lib/utils/URL.h>
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunction_types.h"

namespace facebook::velox::functions {

struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// Network address of the servr to communicate with. Note that this can hold
/// a network location (ip/port pair) or a unix domain socket path (see
/// URL of the HTTP/REST server for remote function.
/// Or Network address of the servr to communicate with. Note that this can
/// hold a network location (ip/port pair) or a unix domain socket path (see
/// SocketAddress::makeFromPath()).
folly::SocketAddress location;
boost::variant<folly::SocketAddress, proxygen::URL> location;

/// The serialization format to be used
remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE};
Expand Down
35 changes: 35 additions & 0 deletions velox/functions/remote/client/RestClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/functions/remote/client/RestClient.h"
#include <proxygen/lib/utils/URL.h>

using namespace facebook::velox::functions;

namespace facebook::velox::functions {

RestClient::RestClient(const std::string& url) : url_(url) {
httpClient_ = std::make_shared<HttpClient>(url_);
};

void RestClient::invoke_function(
const std::string& requestBody,
std::string& responseBody) {
httpClient_->send(requestBody);
responseBody = httpClient_->getResponseBody();
LOG(INFO) << responseBody;
};

} // namespace facebook::velox::functions
129 changes: 129 additions & 0 deletions velox/functions/remote/client/RestClient.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <folly/init/Init.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventBaseManager.h>
#include <folly/io/async/SSLContext.h>
#include <folly/json.h>
#include <proxygen/lib/http/HTTPConnector.h>
#include <proxygen/lib/http/HTTPMessage.h>
#include <proxygen/lib/http/session/HTTPUpstreamSession.h>
#include <proxygen/lib/utils/URL.h>
#include "velox/functions/remote/client/RestClient.h"

using namespace proxygen;
using namespace folly;

namespace facebook::velox::functions {

class HttpClient : public HTTPConnector::Callback,
public HTTPTransactionHandler {
public:
HttpClient(const URL& url) : url_(url) {}

void send(std::string requestBody) {
requestBody_ = requestBody;
connector_ = std::make_unique<proxygen::HTTPConnector>(
this, WheelTimerInstance(std::chrono::milliseconds(1000)));
connector_->connect(
&evb_,
SocketAddress(url_.getHost(), url_.getPort(), true),
std::chrono::milliseconds(10000));
evb_.loop();
}

std::string getResponseBody() {
return std::move(responseBody_);
}

private:
URL url_;
EventBase evb_;
std::unique_ptr<HTTPConnector> connector_;
std::shared_ptr<HTTPUpstreamSession> session_;
std::string requestBody_;
std::string responseBody_;

void connectSuccess(HTTPUpstreamSession* session) noexcept override {
session_ = std::shared_ptr<HTTPUpstreamSession>(
session, [](HTTPUpstreamSession* s) {
// No-op deleter, managed by Proxygen
});
sendRequest();
}

void connectError(const folly::AsyncSocketException& ex) noexcept override {
LOG(ERROR) << "Failed to connect: " << ex.what();
evb_.terminateLoopSoon();
}

void sendRequest() {
auto txn = session_->newTransaction(this);
HTTPMessage req;
req.setMethod(HTTPMethod::POST);
req.setURL(url_.getUrl());
req.getHeaders().add(HTTP_HEADER_CONTENT_TYPE, "application/json");
req.getHeaders().add(
HTTP_HEADER_CONTENT_LENGTH, std::to_string(requestBody_.size()));
req.getHeaders().add(HTTP_HEADER_USER_AGENT, "Velox HTTPClient");

txn->sendHeaders(req);
txn->sendBody(folly::IOBuf::copyBuffer(requestBody_));
txn->sendEOM();
}

void setTransaction(HTTPTransaction*) noexcept override {}
void detachTransaction() noexcept override {
session_.reset();
evb_.terminateLoopSoon();
}

void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept override {}

void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept override {
if (chain) {
responseBody_.append(
reinterpret_cast<const char*>(chain->data()), chain->length());
}
}

void onEOM() noexcept override {
session_->drain();
}

void onError(const HTTPException& error) noexcept override {
LOG(ERROR) << "Error: " << error.what();
}
void onUpgrade(UpgradeProtocol) noexcept override {}
void onTrailers(std::unique_ptr<HTTPHeaders>) noexcept override {}
void onEgressPaused() noexcept override {}
void onEgressResumed() noexcept override {}
};

class RestClient {
public:
RestClient(const std::string& url);
void invoke_function(const std::string& request, std::string& response);

private:
URL url_;
std::shared_ptr<HttpClient> httpClient_;
};

} // namespace facebook::velox::functions
Loading

0 comments on commit 433fc9a

Please sign in to comment.