-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query #33623
GH-33212: [C++][Python] Add use_threads to pyarrow.substrait.run_query #33623
Conversation
… & function_registry to the various DeclarationToXyz methods
CC @vibhatha mind taking a look? |
Sure @westonpace I will take look now. |
if (use_threads) { | ||
return DeclarationToTableAsync(std::move(declaration), *threaded_exec_context()); | ||
ExecContext ctx(memory_pool, ::arrow::internal::GetCpuThreadPool(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is this always going to be using the CPU thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If the user is passing in bool use_threads
then they are choosing between "current thread" and "CPU thread pool". This keeps the simple case simple for most users.
If the user doesn't want the CPU thread pool and they don't want to do everything on the calling thread then they can use the overload that takes a custom ExecContext
.
namespace { | ||
|
||
/// \brief A SinkNodeConsumer specialized to output ExecBatches via PushGenerator | ||
class SubstraitSinkConsumer : public compute::SinkNodeConsumer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember, I am doing a similar cleanup (based on your suggestion) in a particular PR (surely not merged yet), but I cannot remember where, but it is better we get this cleaned up and actually this was one of your suggestions as I can remember.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds likely. If you can remember the PR I can take another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But probably it is better to merge on your work, it is in the simplest form.
ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan( | ||
const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR, | ||
compute::FunctionRegistry* func_registry = NULLPTR, | ||
const ConversionOptions& conversion_options = {}); | ||
const ConversionOptions& conversion_options = {}, bool use_threads = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to use threads by default? Is that okay? I mean depending on various use cases which would be already running things in parallel? Just curious about the usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default is generally to maximize performance at whatever expense to CPU & RAM. I think this is ok. Users usually want things to run as quickly as possible.
python/pyarrow/_substrait.pyx
Outdated
@@ -39,7 +39,7 @@ cdef CDeclaration _create_named_table_provider(dict named_args, const std_vector | |||
c_name = names[i] | |||
py_names.append(frombytes(c_name)) | |||
|
|||
py_table = named_args["provider"](py_names) | |||
py_table = named_args["provider"](py_names) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why this is indented? Why should it run within the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, good catch! I was experimenting with this function for something unrelated and didn't notice the change because I was hiding whitespace changes when doing self-review.
Thanks, I have reverted this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@westonpace thank you for working on this. I added a few suggestions. Except for those minor changes, I think the current changes are good to go.
Co-authored-by: Vibhatha Lakmal Abeykoon <vibhatha@users.noreply.github.com>
https://github.com/apache/arrow/actions/runs/3907791966/jobs/6677356655 |
/arrow/cpp/src/arrow/compute/exec/exec_plan.h:427: error: The following parameter of arrow::compute::DeclarationToTable(Declaration declaration, bool use_threads=true, MemoryPool *memory_pool=default_memory_pool(), FunctionRegistry *function_registry=NULLPTR) is not documented:
parameter 'declaration' (warning treated as error, aborting now) I think this is the issue. I have created an issue here: #33649 I will work on this. |
I created the PR here: #33651 Let's wait for the CIs. |
Benchmark runs are scheduled for baseline = 97998d8 and contender = e1027dc. e1027dc is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Also adds memory_pool and & function_registry to the various DeclarationToXyz methods. Converts
ExecuteSerializedPlan
toDeclarationToReader
instead of the bespoke thing it was doing before.