Skip to content

Commit

Permalink
apacheGH-39769: [C++][Device] Fix Importing nested and string types f…
Browse files Browse the repository at this point in the history
…or DeviceArray (apache#39770)

### Rationale for this change
In my testing with libcudf and other GPU data, I discovered a deficiency in ImportDeviceArray and thus ImportDeviceRecordBatch where the device type and memory manager aren't propagated to child importers and it fails to import offset-based types such as strings.

### What changes are included in this PR?
These are relatively easily handled by first ensuring that `ImportChild` propagates the device_type and memory manager from the parent. Then for importing offset based values we merely need to use the memory manager to copy the final offset value to the CPU to use for the buffer size computation. 

This will work for any device which has implemented CopyBufferTo/From 

### Are these changes tested?
A new test is added to test these situations.

* Closes: apache#39769

Authored-by: Matt Topol <zotthewizard@gmail.com>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
zeroshade authored and zanmato1984 committed Feb 28, 2024
1 parent 55e0bea commit fb0d129
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
23 changes: 20 additions & 3 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,8 @@ struct ArrayImporter {
if (recursion_level_ >= kMaxImportRecursionLevel) {
return Status::Invalid("Recursion level in ArrowArray struct exceeded");
}
device_type_ = parent->device_type_;
memory_mgr_ = parent->memory_mgr_;
// Child buffers will keep the entire parent import alive.
// Perhaps we can move the child structs to an owned area
// when the parent ImportedArrayData::Release() gets called,
Expand Down Expand Up @@ -1857,10 +1859,25 @@ struct ArrayImporter {
template <typename OffsetType>
Status ImportStringValuesBuffer(int32_t offsets_buffer_id, int32_t buffer_id,
int64_t byte_width = 1) {
auto offsets = data_->GetValues<OffsetType>(offsets_buffer_id);
if (device_type_ == DeviceAllocationType::kCPU) {
auto offsets = data_->GetValues<OffsetType>(offsets_buffer_id);
// Compute visible size of buffer
int64_t buffer_size =
(c_struct_->length > 0) ? byte_width * offsets[c_struct_->length] : 0;
return ImportBuffer(buffer_id, buffer_size);
}

// we only need the value of the last offset so let's just copy that
// one value from device to host.
auto single_value_buf =
SliceBuffer(data_->buffers[offsets_buffer_id],
c_struct_->length * sizeof(OffsetType), sizeof(OffsetType));
ARROW_ASSIGN_OR_RAISE(
auto cpubuf, Buffer::ViewOrCopy(single_value_buf, default_cpu_memory_manager()));
auto offsets = cpubuf->data_as<OffsetType>();
// Compute visible size of buffer
int64_t buffer_size =
(c_struct_->length > 0) ? byte_width * offsets[c_struct_->length] : 0;
int64_t buffer_size = (c_struct_->length > 0) ? byte_width * offsets[0] : 0;

return ImportBuffer(buffer_id, buffer_size);
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/c/bridge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4320,6 +4320,16 @@ TEST_F(TestDeviceArrayRoundtrip, Primitive) {
TestWithJSON(mm, int32(), "[4, 5, null]");
}

TEST_F(TestDeviceArrayRoundtrip, Struct) {
std::shared_ptr<Device> device = std::make_shared<MyDevice>(1);
auto mm = device->default_memory_manager();
auto type = struct_({field("ints", int16()), field("strs", utf8())});

TestWithJSON(mm, type, "[]");
TestWithJSON(mm, type, R"([[4, "foo"], [5, "bar"]])");
TestWithJSON(mm, type, R"([[4, null], null, [5, "foo"]])");
}

////////////////////////////////////////////////////////////////////////////
// Array stream export tests

Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferFrom(
if (!from->is_cpu()) {
return nullptr;
}
// in this case the memory manager we're coming from is visible on the CPU,
// but uses an allocation type other than CPU. Since we know the data is visible
// to the CPU a "View" of this should use the CPUMemoryManager as the listed memory
// manager.
if (buf->device_type() != DeviceAllocationType::kCPU) {
return std::make_shared<Buffer>(buf->address(), buf->size(), shared_from_this(), buf);
}
return buf;
}

Expand All @@ -220,6 +227,13 @@ Result<std::shared_ptr<Buffer>> CPUMemoryManager::ViewBufferTo(
if (!to->is_cpu()) {
return nullptr;
}
// in this case the memory manager we're coming from is visible on the CPU,
// but uses an allocation type other than CPU. Since we know the data is visible
// to the CPU a "View" of this should use the CPUMemoryManager as the listed memory
// manager.
if (buf->device_type() != DeviceAllocationType::kCPU) {
return std::make_shared<Buffer>(buf->address(), buf->size(), to, buf);
}
return buf;
}

Expand Down

0 comments on commit fb0d129

Please sign in to comment.