Skip to content

Commit

Permalink
Merge branch 'main' into apacheGH-34252
Browse files Browse the repository at this point in the history
  • Loading branch information
davisusanibar committed Jun 1, 2023
2 parents 892dedc + c67fb39 commit b0fb9a5
Show file tree
Hide file tree
Showing 117 changed files with 8,522 additions and 4,675 deletions.
14 changes: 4 additions & 10 deletions .github/workflows/dev_pr/link.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,16 @@ async function commentGitHubURL(github, context, pullRequestNumber, issueID) {
// Make the call to ensure issue exists before adding comment
const issueInfo = await helpers.getGitHubInfo(github, context, issueID, pullRequestNumber);
const message = "* Closes: #" + issueInfo.number
if (await haveComment(github, context, pullRequestNumber, message)) {
return;
}
if (issueInfo){
if (issueInfo) {
if (context.payload.pull_request.body.includes(message)) {
return;
}
await github.pulls.update({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: pullRequestNumber,
body: (context.payload.pull_request.body || "") + "\n" + message
});
await github.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: pullRequestNumber,
body: message
});
}
}

Expand Down
6 changes: 4 additions & 2 deletions cpp/cmake_modules/SetupCxxFlags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,10 @@ if(NOT MSVC)
string(APPEND CMAKE_CXX_FLAGS_RELEASE "${CXX_RELEASE_FLAGS}")
string(APPEND CMAKE_C_FLAGS_DEBUG "${DEBUG_FLAGS}")
string(APPEND CMAKE_CXX_FLAGS_DEBUG "${DEBUG_FLAGS}")
string(APPEND CMAKE_C_FLAGS_RELWITHDEBINFO "${C_RELEASE_FLAGS} ${DEBUG_FLAGS}")
string(APPEND CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CXX_RELEASE_FLAGS} ${DEBUG_FLAGS}")
# We must put release flags after debug flags to use optimization
# flags in release flags. RelWithDebInfo must enable optimization.
string(APPEND CMAKE_C_FLAGS_RELWITHDEBINFO "${DEBUG_FLAGS} ${C_RELEASE_FLAGS}")
string(APPEND CMAKE_CXX_FLAGS_RELWITHDEBINFO "${DEBUG_FLAGS} ${CXX_RELEASE_FLAGS}")
endif()

message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ set(ARROW_SRCS
tensor/csf_converter.cc
tensor/csx_converter.cc
type.cc
type_traits.cc
visitor.cc
c/bridge.cc
io/buffered.cc
Expand Down Expand Up @@ -414,6 +415,9 @@ list(APPEND
compute/kernels/util_internal.cc
compute/kernels/vector_hash.cc
compute/kernels/vector_selection.cc
compute/kernels/vector_selection_filter_internal.cc
compute/kernels/vector_selection_internal.cc
compute/kernels/vector_selection_take_internal.cc
compute/row/encode_internal.cc
compute/row/compare_internal.cc
compute/row/grouper.cc
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/acero/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ macro(append_acero_avx2_src SRC)
endmacro()

set(ARROW_ACERO_SRCS
groupby.cc
accumulation_queue.cc
aggregate_node.cc
asof_join_node.cc
Expand Down Expand Up @@ -166,7 +165,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
add_arrow_acero_test(groupby_test SOURCES groupby_test.cc)
add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc)
add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc)

Expand Down
15 changes: 14 additions & 1 deletion cpp/src/arrow/acero/aggregate_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

#include <vector>

#include "arrow/acero/groupby.h"
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
Expand Down Expand Up @@ -347,6 +348,18 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}

Result<std::shared_ptr<Table>> BatchGroupBy(
std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
Table::FromRecordBatches({std::move(batch)}));
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,32 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/acero/groupby.h"
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"

#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>

#include <memory>

#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"

namespace arrow {

namespace acero {

Result<std::shared_ptr<Table>> TableGroupBy(
std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys, bool use_threads = false,
MemoryPool* memory_pool = default_memory_pool()) {
Declaration plan = Declaration::Sequence(
{{"table_source", TableSourceNodeOptions(std::move(table))},
{"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
return DeclarationToTable(std::move(plan), use_threads, memory_pool);
}

TEST(GroupByConvenienceFunc, Basic) {
std::shared_ptr<Schema> in_schema =
schema({field("key1", utf8()), field("key2", int32()), field("value", int32())});
Expand Down
41 changes: 37 additions & 4 deletions cpp/src/arrow/acero/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/io_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
#include "arrow/util/string.h"
Expand Down Expand Up @@ -358,10 +359,38 @@ std::optional<int> GetNodeIndex(const std::vector<ExecNode*>& nodes,
return std::nullopt;
}

const char* kAceroAlignmentHandlingEnvVar = "ACERO_ALIGNMENT_HANDLING";

UnalignedBufferHandling DetermineDefaultUnalignedBufferHandling() {
auto maybe_value = ::arrow::internal::GetEnvVar(kAceroAlignmentHandlingEnvVar);
if (!maybe_value.ok()) {
return UnalignedBufferHandling::kWarn;
}
std::string value = maybe_value.MoveValueUnsafe();
if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "warn")) {
return UnalignedBufferHandling::kWarn;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "ignore")) {
return UnalignedBufferHandling::kIgnore;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "reallocate")) {
return UnalignedBufferHandling::kReallocate;
} else if (::arrow::internal::AsciiEqualsCaseInsensitive(value, "error")) {
return UnalignedBufferHandling::kError;
} else {
ARROW_LOG(WARNING) << "unrecognized value for ACERO_ALIGNMENT_HANDLING: " << value;
return UnalignedBufferHandling::kWarn;
}
}

} // namespace

const uint32_t ExecPlan::kMaxBatchSize;

UnalignedBufferHandling GetDefaultUnalignedBufferHandling() {
static UnalignedBufferHandling default_value =
DetermineDefaultUnalignedBufferHandling();
return default_value;
}

Result<std::shared_ptr<ExecPlan>> ExecPlan::Make(
QueryOptions opts, ExecContext ctx,
std::shared_ptr<const KeyValueMetadata> metadata) {
Expand Down Expand Up @@ -621,7 +650,8 @@ Future<std::shared_ptr<Table>> DeclarationToTableImpl(
query_options.function_registry);
std::shared_ptr<std::shared_ptr<Table>> output_table =
std::make_shared<std::shared_ptr<Table>>();
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(query_options, exec_ctx));
TableSinkNodeOptions sink_options(output_table.get());
sink_options.sequence_output = query_options.sequence_output;
sink_options.names = std::move(query_options.field_names);
Expand All @@ -648,7 +678,8 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesImpl(
std::shared_ptr<Schema> out_schema;
AsyncGenerator<std::optional<ExecBatch>> sink_gen;
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(options, exec_ctx));
SinkNodeOptions sink_options(&sink_gen, &out_schema);
sink_options.sequence_output = options.sequence_output;
Declaration with_sink = Declaration::Sequence({declaration, {"sink", sink_options}});
Expand Down Expand Up @@ -678,7 +709,8 @@ Future<BatchesWithCommonSchema> DeclarationToExecBatchesImpl(
Future<> DeclarationToStatusImpl(Declaration declaration, QueryOptions options,
::arrow::internal::Executor* cpu_executor) {
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> exec_plan,
ExecPlan::Make(options, exec_ctx));
ARROW_ASSIGN_OR_RAISE(ExecNode * last_node, declaration.AddToPlan(exec_plan.get()));
if (!last_node->is_sink()) {
ConsumingSinkNodeOptions sink_options(NullSinkNodeConsumer::Make());
Expand Down Expand Up @@ -972,7 +1004,8 @@ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> DeclarationToRecordBatchGen
::arrow::internal::Executor* cpu_executor, std::shared_ptr<Schema>* out_schema) {
auto converter = std::make_shared<BatchConverter>();
ExecContext exec_ctx(options.memory_pool, cpu_executor, options.function_registry);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan, ExecPlan::Make(exec_ctx));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
ExecPlan::Make(options, exec_ctx));
Declaration with_sink = Declaration::Sequence(
{declaration,
{"sink", SinkNodeOptions(&converter->exec_batch_gen, &converter->schema)}});
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/arrow/acero/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ struct ARROW_ACERO_EXPORT Declaration {
std::string label;
};

/// \brief How to handle unaligned buffers
enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError };

/// \brief get the default behavior of unaligned buffer handling
///
/// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which
/// can be set to "warn", "ignore", "reallocate", or "error". If the environment
/// variable is not set, or is set to an invalid value, this will return kWarn
UnalignedBufferHandling GetDefaultUnalignedBufferHandling();

/// \brief plan-wide options that can be specified when executing an execution plan
struct ARROW_ACERO_EXPORT QueryOptions {
/// \brief Should the plan use a legacy batching strategy
Expand Down Expand Up @@ -562,6 +572,36 @@ struct ARROW_ACERO_EXPORT QueryOptions {
///
/// If set then the number of names must equal the number of output columns
std::vector<std::string> field_names;

/// \brief Policy for unaligned buffers in source data
///
/// Various compute functions and acero internals will type pun array
/// buffers from uint8_t* to some kind of value type (e.g. we might
/// cast to int32_t* to add two int32 arrays)
///
/// If the buffer is poorly aligned (e.g. an int32 array is not aligned
/// on a 4-byte boundary) then this is technically undefined behavior in C++.
/// However, most modern compilers and CPUs are fairly tolerant of this
/// behavior and nothing bad (beyond a small hit to performance) is likely
/// to happen.
///
/// Note that this only applies to source buffers. All buffers allocated internally
/// by Acero will be suitably aligned.
///
/// If this field is set to kWarn then Acero will check if any buffers are unaligned
/// and, if they are, will emit a warning.
///
/// If this field is set to kReallocate then Acero will allocate a new, suitably aligned
/// buffer and copy the contents from the old buffer into this new buffer.
///
/// If this field is set to kError then Acero will gracefully abort the plan instead.
///
/// If this field is set to kIgnore then Acero will not even check if the buffers are
/// unaligned.
///
/// If this field is not set then it will be treated as kWarn unless overridden
/// by the ACERO_ALIGNMENT_HANDLING environment variable
std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
};

/// \brief Calculate the output schema of a declaration
Expand Down
61 changes: 0 additions & 61 deletions cpp/src/arrow/acero/groupby.cc

This file was deleted.

Loading

0 comments on commit b0fb9a5

Please sign in to comment.