Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query #33623

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 52 additions & 35 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,23 +563,28 @@ Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
return exec_plan->finished().Then([exec_plan, output_table] { return *output_table; });
}

Future<std::shared_ptr<Table>> DeclarationToTableAsync(Declaration declaration,
bool use_threads) {
Future<std::shared_ptr<Table>> DeclarationToTableAsync(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToTableAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is this always going to be using the CPU thread pool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If the user is passing in bool use_threads then they are choosing between "current thread" and "CPU thread pool". This keeps the simple case simple for most users.

If the user doesn't want the CPU thread pool and they don't want to do everything on the calling thread then they can use the overload that takes a custom ExecContext.

function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx)
.Then([tpool](const std::shared_ptr<Table>& table) { return table; });
}
}

Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
bool use_threads) {
bool use_threads,
MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<std::shared_ptr<Table>>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToTableAsync(std::move(declaration), ctx);
},
use_threads);
Expand All @@ -594,12 +599,15 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}

Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, bool use_threads) {
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToBatchesAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const std::vector<std::shared_ptr<RecordBatch>>& batches) {
return batches;
Expand All @@ -608,11 +616,12 @@ Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
}

Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, bool use_threads) {
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<
Future<std::vector<std::shared_ptr<RecordBatch>>>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToBatchesAsync(std::move(declaration), ctx);
},
use_threads);
Expand Down Expand Up @@ -641,24 +650,27 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declar
});
}

Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(Declaration declaration,
bool use_threads) {
Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToExecBatchesAsync(std::move(declaration),
*threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx)
.Then([tpool](const BatchesWithCommonSchema& batches) { return batches; });
}
}

Result<BatchesWithCommonSchema> DeclarationToExecBatches(Declaration declaration,
bool use_threads) {
Result<BatchesWithCommonSchema> DeclarationToExecBatches(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<BatchesWithCommonSchema>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToExecBatchesAsync(std::move(declaration), ctx);
},
use_threads);
Expand All @@ -680,20 +692,25 @@ Future<> DeclarationToStatusAsync(Declaration declaration, ExecContext exec_cont
return exec_plan->finished().Then([exec_plan]() {});
}

Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads) {
Future<> DeclarationToStatusAsync(Declaration declaration, bool use_threads,
MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
if (use_threads) {
return DeclarationToStatusAsync(std::move(declaration), *threaded_exec_context());
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(),
function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx);
} else {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ThreadPool> tpool, ThreadPool::Make(1));
ExecContext ctx(default_memory_pool(), tpool.get());
ExecContext ctx(memory_pool, tpool.get(), function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx).Then([tpool]() {});
}
}

Status DeclarationToStatus(Declaration declaration, bool use_threads) {
Status DeclarationToStatus(Declaration declaration, bool use_threads,
MemoryPool* memory_pool, FunctionRegistry* function_registry) {
return ::arrow::internal::RunSynchronously<Future<>>(
[declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(default_memory_pool(), executor);
[=, declaration = std::move(declaration)](::arrow::internal::Executor* executor) {
ExecContext ctx(memory_pool, executor, function_registry);
return DeclarationToStatusAsync(std::move(declaration), ctx);
},
use_threads);
Expand Down Expand Up @@ -738,11 +755,9 @@ struct BatchConverter {
};

Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGenerator(
Declaration declaration, ::arrow::internal::Executor* executor,
std::shared_ptr<Schema>* out_schema) {
Declaration declaration, ExecContext exec_ctx, std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_context(default_memory_pool(), executor);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_context));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
Expand All @@ -754,14 +769,16 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGen
}
} // namespace

Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration declaration,
bool use_threads) {
Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads, MemoryPool* memory_pool,
FunctionRegistry* function_registry) {
std::shared_ptr<Schema> schema;
auto batch_iterator = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
[&](::arrow::internal::Executor* executor)
-> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
return DeclarationToRecordBatchGenerator(declaration, executor, &schema);
ExecContext exec_ctx(memory_pool, executor, function_registry);
return DeclarationToRecordBatchGenerator(declaration, exec_ctx, &schema);
},
use_threads));

Expand Down
68 changes: 49 additions & 19 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,24 +426,36 @@ struct ARROW_EXPORT Declaration {

/// \brief Utility method to run a declaration and collect the results into a table
///
/// \param use_threads If `use_threads` is false then all CPU work will be done on the
/// calling thread. I/O tasks will still happen on the I/O executor
/// and may be multi-threaded (but should not use significant CPU
/// resources)
westonpace marked this conversation as resolved.
Show resolved Hide resolved
/// \param memory_pool The memory pool to use for allocations made while running the plan.
/// \param function_registry The function registry to use for function execution. If null
/// then the default function registry will be used.
///
/// This method will add a sink node to the declaration to collect results into a
/// table. It will then create an ExecPlan from the declaration, start the exec plan,
/// block until the plan has finished, and return the created table.
///
/// If `use_threads` is false then all CPU work will be done on the calling thread. I/O
/// tasks will still happen on the I/O executor and may be multi-threaded (but should
/// not use significant CPU resources)
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(Declaration declaration,
bool use_threads = true);
ARROW_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToTable
///
/// The behavior of use_threads is slightly different than the synchronous version since
/// we cannot run synchronously on the calling thread. Instead, if use_threads=false then
/// a new thread pool will be created with a single thread and this will be used for all
/// compute work.
/// \param use_threads The behavior of use_threads is slightly different than the
/// synchronous version since we cannot run synchronously on the
/// calling thread. Instead, if use_threads=false then a new thread
westonpace marked this conversation as resolved.
Show resolved Hide resolved
/// pool will be created with a single thread and this will be used for
/// all compute work.
/// \param memory_pool The memory pool to use for allocations made while running the plan.
/// \param function_registry The function registry to use for function execution. If null
westonpace marked this conversation as resolved.
Show resolved Hide resolved
/// then the default function registry will be used.
ARROW_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToTableAsync accepting a custom exec context
///
Expand All @@ -463,13 +475,17 @@ struct BatchesWithCommonSchema {
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToExecBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context
///
Expand All @@ -481,13 +497,17 @@ ARROW_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToBatches
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatchesAsync(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToBatchesAsync accepting a custom exec context
///
Expand All @@ -511,24 +531,34 @@ ARROW_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatc
///
/// If a custom exec context is provided then the value of `use_threads` will be ignored.
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, bool use_threads = true);
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToReader accepting a custom exec context
ARROW_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
Declaration declaration, ExecContext exec_context);

/// \brief Utility method to run a declaration and ignore results
///
/// This can be useful when the data are consumed as part of the plan itself, for
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTable for details on threading & execution
ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true);
ARROW_EXPORT Status DeclarationToStatus(Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Asynchronous version of \see DeclarationToStatus
///
/// This can be useful when the data are consumed as part of the plan itself, for
/// example, when the plan ends with a write node.
///
/// \see DeclarationToTableAsync for details on threading & execution
ARROW_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration,
bool use_threads = true);
ARROW_EXPORT Future<> DeclarationToStatusAsync(
Declaration declaration, bool use_threads = true,
MemoryPool* memory_pool = default_memory_pool(),
FunctionRegistry* function_registry = NULLPTR);

/// \brief Overload of \see DeclarationToStatusAsync accepting a custom exec context
///
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ DeclarationFactory MakeWriteDeclarationFactory(
};
}

DeclarationFactory MakeNoSinkDeclarationFactory() {
return [](compute::Declaration input,
std::vector<std::string> names) -> Result<compute::Declaration> {
return input;
};
}

// FIXME - Replace with actual version that includes the change
constexpr uint32_t kMinimumMajorVersion = 0;
constexpr uint32_t kMinimumMinorVersion = 19;
Expand Down Expand Up @@ -188,6 +195,21 @@ Result<std::vector<compute::Declaration>> DeserializePlans(
registry, ext_set_out, conversion_options);
}

ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
const Buffer& buf, const ExtensionIdRegistry* registry, ExtensionSet* ext_set_out,
const ConversionOptions& conversion_options) {
ARROW_ASSIGN_OR_RAISE(std::vector<compute::Declaration> top_level_decls,
DeserializePlans(buf, MakeNoSinkDeclarationFactory(), registry,
ext_set_out, conversion_options));
if (top_level_decls.empty()) {
return Status::Invalid("No RootRel in plan");
westonpace marked this conversation as resolved.
Show resolved Hide resolved
}
if (top_level_decls.size() != 1) {
return Status::Invalid("Multiple top level declarations found in Substrait plan");
}
return top_level_decls[0];
}

namespace {

Result<std::shared_ptr<compute::ExecPlan>> MakeSingleDeclarationPlan(
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/engine/substrait/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ ARROW_ENGINE_EXPORT Result<std::shared_ptr<compute::ExecPlan>> DeserializePlan(
const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR,
const ConversionOptions& conversion_options = {});

/// \brief Deserializes a Substrait Plan message to a declaration
westonpace marked this conversation as resolved.
Show resolved Hide resolved
///
/// The plan will not contain any sink nodes and will be suitable for use in any
/// of the arrow::compute::DeclarationTo... methods.
westonpace marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
/// message
/// \param[in] registry an extension-id-registry to use, or null for the default one.
/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
/// Plan is returned here.
/// \param[in] conversion_options options to control how the conversion is to be done.
/// \return A declaration representing the Substrait plan
ARROW_ENGINE_EXPORT Result<compute::Declaration> DeserializePlan(
const Buffer& buf, const ExtensionIdRegistry* registry = NULLPTR,
ExtensionSet* ext_set_out = NULLPTR,
const ConversionOptions& conversion_options = {});

/// \brief Deserializes a Substrait Type message to the corresponding Arrow type
///
/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type
Expand Down
Loading