Skip to content

Commit

Permalink
PARQUET-827: Account for arrow::MemoryPool API change and fix bug in …
Browse files Browse the repository at this point in the history
…reading Int96 timestamps

Prior to ARROW-427, the "length()" field was not being properly checked in the implementation of `PrimitiveArray::Equals`.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#215 from wesm/PARQUET-825 and squashes the following commits:

afa41f9 [Wes McKinney] Do not build arrow_jemalloc in arrow external project
ff9f22e [Wes McKinney] cpplint, clang-format
802b325 [Wes McKinney] Update Arrow version
5f155b6 [Wes McKinney] Fix bug exposed by accidental bug fix in ARROW-427
a3c75bb [Wes McKinney] Implement TrackingAllocator based on Arrow default allocator
  • Loading branch information
wesm committed Sep 2, 2018
1 parent 7c66f55 commit 0daa871
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 43 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96T
for (int64_t i = 0; i < values_read; i++) {
out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
}
valid_bits_idx_ += values_read;
}

template <>
Expand Down
12 changes: 4 additions & 8 deletions cpp/src/parquet/schema/descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,10 @@ void SchemaDescriptor::Init(const NodePtr& schema) {
}

bool SchemaDescriptor::Equals(const SchemaDescriptor& other) const {
if (this->num_columns() != other.num_columns()) {
return false;
}
if (this->num_columns() != other.num_columns()) { return false; }

for (int i = 0; i < this->num_columns(); ++i) {
if (!this->Column(i)->Equals(*other.Column(i))) {
return false;
}
if (!this->Column(i)->Equals(*other.Column(i))) { return false; }
}

return true;
Expand Down Expand Up @@ -98,8 +94,8 @@ ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node,

bool ColumnDescriptor::Equals(const ColumnDescriptor& other) const {
return primitive_node_->Equals(other.primitive_node_) &&
max_repetition_level() == other.max_repetition_level() &&
max_definition_level() == other.max_definition_level();
max_repetition_level() == other.max_repetition_level() &&
max_definition_level() == other.max_definition_level();
}

const ColumnDescriptor* SchemaDescriptor::Column(int i) const {
Expand Down
16 changes: 6 additions & 10 deletions cpp/src/parquet/schema/schema-descriptor-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,26 @@ TEST_F(TestSchemaDescriptor, Equals) {
NodePtr bag2(GroupNode::Make("bag", Repetition::REQUIRED, {list}));

SchemaDescriptor descr1;
descr1.Init(GroupNode::Make("schema", Repetition::REPEATED,
{inta, intb, intc, bag}));
descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag}));

ASSERT_TRUE(descr1.Equals(descr1));

SchemaDescriptor descr2;
descr2.Init(GroupNode::Make("schema", Repetition::REPEATED,
{inta, intb, intc, bag2}));
descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag2}));
ASSERT_FALSE(descr1.Equals(descr2));

SchemaDescriptor descr3;
descr3.Init(GroupNode::Make("schema", Repetition::REPEATED,
{inta, intb2, intc, bag}));
descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb2, intc, bag}));
ASSERT_FALSE(descr1.Equals(descr3));

// Robust to name of parent node
SchemaDescriptor descr4;
descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED,
{inta, intb, intc, bag}));
descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, {inta, intb, intc, bag}));
ASSERT_TRUE(descr1.Equals(descr4));

SchemaDescriptor descr5;
descr5.Init(GroupNode::Make("schema", Repetition::REPEATED,
{inta, intb, intc, bag, intb2}));
descr5.Init(
GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag, intb2}));
ASSERT_FALSE(descr1.Equals(descr5));

// Different max repetition / definition levels
Expand Down
39 changes: 22 additions & 17 deletions cpp/src/parquet/util/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <cstdio>
#include <string>

#include "arrow/status.h"

#include "parquet/exception.h"
#include "parquet/types.h"
#include "parquet/util/bit-util.h"
Expand All @@ -30,31 +32,34 @@
namespace parquet {

::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) {
if (0 == size) {
if (size == 0) {
*out = nullptr;
return ::arrow::Status::OK();
}
ARROW_RETURN_NOT_OK(allocator_->Allocate(size, out));
const int64_t total_memory = allocator_->bytes_allocated();
if (total_memory > max_memory_) { max_memory_ = total_memory; }
return ::arrow::Status::OK();
}

uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); }
{
std::lock_guard<std::mutex> lock(stats_mutex_);
total_memory_ += size;
if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
}
*out = p;
::arrow::Status TrackingAllocator::Reallocate(
int64_t old_size, int64_t new_size, uint8_t** out) {
ARROW_RETURN_NOT_OK(allocator_->Reallocate(old_size, new_size, out));
const int64_t total_memory = allocator_->bytes_allocated();
if (total_memory > max_memory_) { max_memory_ = total_memory; }
return ::arrow::Status::OK();
}

void TrackingAllocator::Free(uint8_t* p, int64_t size) {
if (nullptr != p && size > 0) {
{
std::lock_guard<std::mutex> lock(stats_mutex_);
DCHECK_GE(total_memory_, size) << "Attempting to free too much memory";
total_memory_ -= size;
}
std::free(p);
}
allocator_->Free(p, size);
}

int64_t TrackingAllocator::max_memory() const {
return max_memory_.load();
}

int64_t TrackingAllocator::bytes_allocated() const {
return allocator_->bytes_allocated();
}

MemoryAllocator* default_allocator() {
Expand Down
16 changes: 8 additions & 8 deletions cpp/src/parquet/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#ifndef PARQUET_UTIL_MEMORY_H
#define PARQUET_UTIL_MEMORY_H

#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -72,19 +72,19 @@ PARQUET_EXPORT MemoryAllocator* default_allocator();

class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
public:
TrackingAllocator() : total_memory_(0), max_memory_(0) {}
explicit TrackingAllocator(MemoryAllocator* allocator = ::arrow::default_memory_pool())
: allocator_(allocator), max_memory_(0) {}

::arrow::Status Allocate(int64_t size, uint8_t** out) override;
::arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
void Free(uint8_t* p, int64_t size) override;

int64_t bytes_allocated() const override { return total_memory_; }

int64_t max_memory() { return max_memory_; }
int64_t max_memory() const;
int64_t bytes_allocated() const override;

private:
std::mutex stats_mutex_;
int64_t total_memory_;
int64_t max_memory_;
MemoryAllocator* allocator_;
std::atomic<int64_t> max_memory_;
};

template <class T>
Expand Down

0 comments on commit 0daa871

Please sign in to comment.